Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/prysmaticlabs/prysm into…
Browse files Browse the repository at this point in the history
… create-log-file
  • Loading branch information
bharath-123 committed Aug 6, 2023
2 parents 386c186 + 634133f commit 0d787bf
Show file tree
Hide file tree
Showing 22 changed files with 193 additions and 124 deletions.
1 change: 1 addition & 0 deletions beacon-chain/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/rpc/eth/beacon:go_default_library",
"//beacon-chain/rpc/eth/builder:go_default_library",
"//beacon-chain/rpc/eth/debug:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"errors.go",
"log.go",
"service.go",
"validator.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core",
Expand All @@ -20,6 +21,7 @@ go_library(
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/sync:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
Expand Down
18 changes: 18 additions & 0 deletions beacon-chain/rpc/core/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package core

import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
)

type Service struct {
HeadFetcher blockchain.HeadFetcher
GenesisTimeFetcher blockchain.TimeFetcher
SyncChecker sync.Checker
Broadcaster p2p.Broadcaster
SyncCommitteePool synccommittee.Pool
OperationNotifier opfeed.Notifier
}
40 changes: 17 additions & 23 deletions beacon-chain/rpc/core/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/epoch/precompute"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
Expand Down Expand Up @@ -48,18 +44,21 @@ func (e *AggregateBroadcastFailedError) Error() string {

// ComputeValidatorPerformance reports the validator's latest balance along with other important metrics on
// rewards and penalties throughout its lifecycle in the beacon chain.
func ComputeValidatorPerformance(
func (s *Service) ComputeValidatorPerformance(
ctx context.Context,
req *ethpb.ValidatorPerformanceRequest,
headFetcher blockchain.HeadFetcher,
currSlot primitives.Slot,
) (*ethpb.ValidatorPerformanceResponse, *RpcError) {
headState, err := headFetcher.HeadState(ctx)
if s.SyncChecker.Syncing() {
return nil, &RpcError{Reason: Unavailable, Err: errors.New("Syncing to latest head, not ready to respond")}
}

headState, err := s.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, &RpcError{Err: errors.Wrap(err, "could not get head state"), Reason: Internal}
}
currSlot := s.GenesisTimeFetcher.CurrentSlot()
if currSlot > headState.Slot() {
headRoot, err := headFetcher.HeadRoot(ctx)
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
if err != nil {
return nil, &RpcError{Err: errors.Wrap(err, "could not get head root"), Reason: Internal}
}
Expand Down Expand Up @@ -200,21 +199,18 @@ func ComputeValidatorPerformance(

// SubmitSignedContributionAndProof is called by a sync committee aggregator
// to submit signed contribution and proof object.
func SubmitSignedContributionAndProof(
func (s *Service) SubmitSignedContributionAndProof(
ctx context.Context,
s *ethpb.SignedContributionAndProof,
broadcaster p2p.Broadcaster,
pool synccommittee.Pool,
notifier opfeed.Notifier,
req *ethpb.SignedContributionAndProof,
) *RpcError {
errs, ctx := errgroup.WithContext(ctx)

// Broadcasting and saving contribution into the pool in parallel. As one fail should not affect another.
errs.Go(func() error {
return broadcaster.Broadcast(ctx, s)
return s.Broadcaster.Broadcast(ctx, req)
})

if err := pool.SaveSyncCommitteeContribution(s.Message.Contribution); err != nil {
if err := s.SyncCommitteePool.SaveSyncCommitteeContribution(req.Message.Contribution); err != nil {
return &RpcError{Err: err, Reason: Internal}
}

Expand All @@ -224,22 +220,20 @@ func SubmitSignedContributionAndProof(
return &RpcError{Err: err, Reason: Internal}
}

notifier.OperationFeed().Send(&feed.Event{
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: opfeed.SyncCommitteeContributionReceived,
Data: &opfeed.SyncCommitteeContributionReceivedData{
Contribution: s,
Contribution: req,
},
})

return nil
}

// SubmitSignedAggregateSelectionProof verifies given aggregate and proofs and publishes them on appropriate gossipsub topic.
func SubmitSignedAggregateSelectionProof(
func (s *Service) SubmitSignedAggregateSelectionProof(
ctx context.Context,
req *ethpb.SignedAggregateSubmitRequest,
genesisTime time.Time,
broadcaster p2p.Broadcaster,
) *RpcError {
if req.SignedAggregateAndProof == nil || req.SignedAggregateAndProof.Message == nil ||
req.SignedAggregateAndProof.Message.Aggregate == nil || req.SignedAggregateAndProof.Message.Aggregate.Data == nil {
Expand All @@ -253,11 +247,11 @@ func SubmitSignedAggregateSelectionProof(

// As a preventive measure, a beacon node shouldn't broadcast an attestation whose slot is out of range.
if err := helpers.ValidateAttestationTime(req.SignedAggregateAndProof.Message.Aggregate.Data.Slot,
genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
s.GenesisTimeFetcher.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
return &RpcError{Err: errors.New("attestation slot is no longer valid from current time"), Reason: BadRequest}
}

if err := broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil {
if err := s.Broadcaster.Broadcast(ctx, req.SignedAggregateAndProof); err != nil {
return &RpcError{Err: &AggregateBroadcastFailedError{err: err}, Reason: Internal}
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/eth/validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ go_test(
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library",
"//beacon-chain/rpc/testutil:go_default_library",
"//beacon-chain/state:go_default_library",
Expand Down
8 changes: 2 additions & 6 deletions beacon-chain/rpc/eth/validator/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *Server) SubmitContributionAndProofs(w http.ResponseWriter, r *http.Requ
http2.HandleError(w, "Could not convert request contribution to consensus contribution: "+err.Error(), http.StatusBadRequest)
return
}
rpcError := core.SubmitSignedContributionAndProof(r.Context(), consensusItem, s.Broadcaster, s.SyncCommitteePool, s.OperationNotifier)
rpcError := s.CoreService.SubmitSignedContributionAndProof(r.Context(), consensusItem)
if rpcError != nil {
http2.HandleError(w, rpcError.Err.Error(), core.ErrorReasonToHTTP(rpcError.Reason))
}
Expand All @@ -131,20 +131,16 @@ func (s *Server) SubmitAggregateAndProofs(w http.ResponseWriter, r *http.Request
return
}

genesisTime := s.TimeFetcher.GenesisTime()

broadcastFailed := false
for _, item := range req.Data {
consensusItem, err := item.ToConsensus()
if err != nil {
http2.HandleError(w, "Could not convert request aggregate to consensus aggregate: "+err.Error(), http.StatusBadRequest)
return
}
rpcError := core.SubmitSignedAggregateSelectionProof(
rpcError := s.CoreService.SubmitSignedAggregateSelectionProof(
r.Context(),
&ethpbalpha.SignedAggregateSubmitRequest{SignedAggregateAndProof: consensusItem},
genesisTime,
s.Broadcaster,
)
if rpcError != nil {
_, ok := rpcError.Err.(*core.AggregateBroadcastFailedError)
Expand Down
33 changes: 20 additions & 13 deletions beacon-chain/rpc/eth/validator/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
p2pmock "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
Expand Down Expand Up @@ -287,14 +288,16 @@ func TestGetAggregateAttestation_SameSlotAndRoot_ReturnMostAggregationBits(t *te
}

func TestSubmitContributionAndProofs(t *testing.T) {
s := &Server{
c := &core.Service{
OperationNotifier: (&mockChain.ChainService{}).OperationNotifier(),
}

s := &Server{CoreService: c}

t.Run("single", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
s.SyncCommitteePool = synccommittee.NewStore()
c.Broadcaster = broadcaster
c.SyncCommitteePool = synccommittee.NewStore()

var body bytes.Buffer
_, err := body.WriteString(singleContribution)
Expand All @@ -306,15 +309,15 @@ func TestSubmitContributionAndProofs(t *testing.T) {
s.SubmitContributionAndProofs(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, 1, len(broadcaster.BroadcastMessages))
contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1)
contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1)
require.NoError(t, err)
assert.Equal(t, 1, len(contributions))
})

t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
s.SyncCommitteePool = synccommittee.NewStore()
c.Broadcaster = broadcaster
c.SyncCommitteePool = synccommittee.NewStore()

var body bytes.Buffer
_, err := body.WriteString(multipleContributions)
Expand All @@ -326,13 +329,13 @@ func TestSubmitContributionAndProofs(t *testing.T) {
s.SubmitContributionAndProofs(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, 2, len(broadcaster.BroadcastMessages))
contributions, err := s.SyncCommitteePool.SyncCommitteeContributions(1)
contributions, err := c.SyncCommitteePool.SyncCommitteeContributions(1)
require.NoError(t, err)
assert.Equal(t, 2, len(contributions))
})

t.Run("invalid", func(t *testing.T) {
s.SyncCommitteePool = synccommittee.NewStore()
c.SyncCommitteePool = synccommittee.NewStore()

var body bytes.Buffer
_, err := body.WriteString(invalidContribution)
Expand All @@ -349,7 +352,7 @@ func TestSubmitContributionAndProofs(t *testing.T) {
})

t.Run("no body", func(t *testing.T) {
s.SyncCommitteePool = synccommittee.NewStore()
c.SyncCommitteePool = synccommittee.NewStore()

request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
writer := httptest.NewRecorder()
Expand All @@ -365,13 +368,17 @@ func TestSubmitContributionAndProofs(t *testing.T) {
}

func TestSubmitAggregateAndProofs(t *testing.T) {
c := &core.Service{
GenesisTimeFetcher: &mockChain.ChainService{},
}

s := &Server{
TimeFetcher: &mockChain.ChainService{},
CoreService: c,
}

t.Run("single", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
c.Broadcaster = broadcaster

var body bytes.Buffer
_, err := body.WriteString(singleAggregate)
Expand All @@ -387,8 +394,8 @@ func TestSubmitAggregateAndProofs(t *testing.T) {

t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pmock.MockBroadcaster{}
s.Broadcaster = broadcaster
s.SyncCommitteePool = synccommittee.NewStore()
c.Broadcaster = broadcaster
c.SyncCommitteePool = synccommittee.NewStore()

var body bytes.Buffer
_, err := body.WriteString(multipleAggregates)
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/eth/validator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/lookup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand All @@ -32,4 +33,5 @@ type Server struct {
BeaconDB db.HeadAccessDatabase
BlockBuilder builder.BlockBuilder
OperationNotifier operation.Notifier
CoreService *core.Service
}
1 change: 1 addition & 0 deletions beacon-chain/rpc/prysm/v1alpha1/beacon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_test(
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/prysm/v1alpha1/beacon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -47,4 +48,5 @@ type Server struct {
SyncChecker sync.Checker
ReplayerBuilder stategen.ReplayerBuilder
OptimisticModeFetcher blockchain.OptimisticModeFetcher
CoreService *core.Service
}
6 changes: 1 addition & 5 deletions beacon-chain/rpc/prysm/v1alpha1/beacon/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,7 @@ func (bs *Server) GetValidatorQueue(
func (bs *Server) GetValidatorPerformance(
ctx context.Context, req *ethpb.ValidatorPerformanceRequest,
) (*ethpb.ValidatorPerformanceResponse, error) {
if bs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
currSlot := bs.GenesisTimeFetcher.CurrentSlot()
response, err := core.ComputeValidatorPerformance(ctx, req, bs.HeadFetcher, currSlot)
response, err := bs.CoreService.ComputeValidatorPerformance(ctx, req)
if err != nil {
return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not compute validator performance: %v", err.Err)
}
Expand Down
Loading

0 comments on commit 0d787bf

Please sign in to comment.