Skip to content

Commit

Permalink
Prysm V4: SSE api adding payload_attributes (#12102)
Browse files Browse the repository at this point in the history
  • Loading branch information
james-prysm authored Mar 15, 2023
1 parent 5c234c8 commit b180a7d
Show file tree
Hide file tree
Showing 11 changed files with 966 additions and 105 deletions.
1 change: 1 addition & 0 deletions beacon-chain/rpc/apimiddleware/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/eth/v2:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_r3labs_sse//:go_default_library",
Expand Down
19 changes: 19 additions & 0 deletions beacon-chain/rpc/apimiddleware/custom_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/api/gateway/apimiddleware"
"github.com/prysmaticlabs/prysm/v3/api/grpc"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/eth/events"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/r3labs/sse"
)

Expand Down Expand Up @@ -364,6 +366,10 @@ func handleEvents(m *apimiddleware.ApiProxyMiddleware, _ apimiddleware.Endpoint,
return true
}

type dataSubset struct {
Version string `json:"version"`
}

func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http.Request) apimiddleware.ErrorJson {
for {
select {
Expand Down Expand Up @@ -418,6 +424,19 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http
data = &SignedContributionAndProofJson{}
case events.BLSToExecutionChangeTopic:
data = &SignedBLSToExecutionChangeJson{}
case events.PayloadAttributesTopic:
dataSubset := &dataSubset{}
if err := json.Unmarshal(msg.Data, dataSubset); err != nil {
return apimiddleware.InternalServerError(err)
}
switch dataSubset.Version {
case version.String(version.Capella):
data = &EventPayloadAttributeStreamV2Json{}
case version.String(version.Bellatrix):
data = &EventPayloadAttributeStreamV1Json{}
default:
return apimiddleware.InternalServerError(errors.New("payload version unsupported"))
}
case "error":
data = &EventErrorJson{}
default:
Expand Down
41 changes: 41 additions & 0 deletions beacon-chain/rpc/apimiddleware/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,47 @@ type EventChainReorgJson struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
}

type EventPayloadAttributeStreamV1Json struct {
Version string `json:"version"`
Data *EventPayloadAttributeV1Json
}

type EventPayloadAttributeStreamV2Json struct {
Version string `json:"version"`
Data *EventPayloadAttributeV2Json
}

type EventPayloadAttributeV1Json struct {
ProposerIndex string `json:"proposer_index"`
ProposalSlot string `json:"proposal_slot"`
ParentBlockNumber string `json:"parent_block_number"`
ParentBlockRoot string `json:"parent_block_root" hex:"true"`
ParentBlockHash string `json:"parent_block_hash" hex:"true"`
PayloadAttributes *PayloadAttributesV1Json `json:"payload_attributes"`
}

type EventPayloadAttributeV2Json struct {
ProposerIndex string `json:"proposer_index"`
ProposalSlot string `json:"proposal_slot"`
ParentBlockNumber string `json:"parent_block_number"`
ParentBlockRoot string `json:"parent_block_root" hex:"true"`
ParentBlockHash string `json:"parent_block_hash" hex:"true"`
PayloadAttributes *PayloadAttributesV2Json `json:"payload_attributes_v2"`
}

type PayloadAttributesV1Json struct {
Timestamp string `json:"timestamp"`
Random string `json:"prev_randao" hex:"true"`
SuggestedFeeRecipient string `json:"suggested_fee_recipient" hex:"true"`
}

type PayloadAttributesV2Json struct {
Timestamp string `json:"timestamp"`
Random string `json:"prev_randao" hex:"true"`
SuggestedFeeRecipient string `json:"suggested_fee_recipient" hex:"true"`
Withdrawals []*WithdrawalJson `json:"withdrawals"`
}

// ---------------
// Error handling.
// ---------------
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/eth/events",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/service:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/migration:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
Expand All @@ -32,18 +39,25 @@ go_test(
deps = [
"//async/event:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//config/fieldparams:go_default_library",
"//consensus-types/blocks:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/migration:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/mock:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
Expand Down
106 changes: 99 additions & 7 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ import (
blockfeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/time"
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
ethpbservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
"github.com/prysmaticlabs/prysm/v3/proto/migration"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/prysmaticlabs/prysm/v3/time/slots"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand All @@ -35,6 +41,8 @@ const (
SyncCommitteeContributionTopic = "contribution_and_proof"
// BLSToExecutionChangeTopic represents a new received BLS to execution change event topic.
BLSToExecutionChangeTopic = "bls_to_execution_change"
// PayloadAttributesTopic represents a new payload attributes for execution payload building event topic.
PayloadAttributesTopic = "payload_attributes"
)

var casesHandled = map[string]bool{
Expand All @@ -46,6 +54,7 @@ var casesHandled = map[string]bool{
ChainReorgTopic: true,
SyncCommitteeContributionTopic: true,
BLSToExecutionChangeTopic: true,
PayloadAttributesTopic: true,
}

// StreamEvents allows requesting all events from a set of topics defined in the Ethereum consensus API standard.
Expand Down Expand Up @@ -95,7 +104,7 @@ func (s *Server) StreamEvents(
return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err)
}
case event := <-stateChan:
if err := handleStateEvents(stream, requestedTopics, event); err != nil {
if err := s.handleStateEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle state event: %v", err)
}
case <-s.Ctx.Done():
Expand Down Expand Up @@ -191,24 +200,31 @@ func handleBlockOperationEvents(
}
v2Change := migration.V1Alpha1SignedBLSToExecChangeToV2(changeData.Change)
return streamData(stream, BLSToExecutionChangeTopic, v2Change)

default:
return nil
}
}

func handleStateEvents(
func (s *Server) handleStateEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
switch event.Type {
case statefeed.NewHead:
if _, ok := requestedTopics[HeadTopic]; !ok {
return nil
if _, ok := requestedTopics[HeadTopic]; ok {
head, ok := event.Data.(*ethpb.EventHead)
if !ok {
return nil
}
return streamData(stream, HeadTopic, head)
}
head, ok := event.Data.(*ethpb.EventHead)
if !ok {
if _, ok := requestedTopics[PayloadAttributesTopic]; ok {
if err := s.streamPayloadAttributes(stream); err != nil {
log.WithError(err).Error("Unable to obtain stream payload attributes")
}
return nil
}
return streamData(stream, HeadTopic, head)
return nil
case statefeed.FinalizedCheckpoint:
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok {
return nil
Expand All @@ -232,6 +248,82 @@ func handleStateEvents(
}
}

// streamPayloadAttributes on new head event.
// This event stream is intended to be used by builders and relays.
func (s *Server) streamPayloadAttributes(stream ethpbservice.Events_StreamEventsServer) error {
headState, err := s.HeadFetcher.HeadStateReadOnly(s.Ctx)
if err != nil {
return err
}

headBlock, err := s.HeadFetcher.HeadBlock(s.Ctx)
if err != nil {
return err
}

headRoot, err := s.HeadFetcher.HeadRoot(s.Ctx)
if err != nil {
return err
}

headPayload, err := headBlock.Block().Body().Execution()
if err != nil {
return err
}

t, err := slots.ToTime(uint64(headState.GenesisTime()), headState.Slot())
if err != nil {
return err
}

prevRando, err := helpers.RandaoMix(headState, time.CurrentEpoch(headState))
if err != nil {
return err
}

switch headState.Version() {
case version.Bellatrix:
return streamData(stream, PayloadAttributesTopic, &ethpb.EventPayloadAttributeV1{
Version: version.String(headState.Version()),
Data: &ethpb.EventPayloadAttributeV1_BasePayloadAttribute{
ProposerIndex: headBlock.Block().ProposerIndex(),
ProposalSlot: headState.Slot(),
ParentBlockNumber: headPayload.BlockNumber(),
ParentBlockRoot: headRoot,
ParentBlockHash: headPayload.BlockHash(),
PayloadAttributes: &enginev1.PayloadAttributes{
Timestamp: uint64(t.Unix()),
PrevRandao: prevRando,
SuggestedFeeRecipient: headPayload.FeeRecipient(),
},
},
})
case version.Capella:
withdrawals, err := headState.ExpectedWithdrawals()
if err != nil {
return err
}
return streamData(stream, PayloadAttributesTopic, &ethpb.EventPayloadAttributeV2{
Version: version.String(headState.Version()),
Data: &ethpb.EventPayloadAttributeV2_BasePayloadAttribute{
ProposerIndex: headBlock.Block().ProposerIndex(),
ProposalSlot: headState.Slot(),
ParentBlockNumber: headPayload.BlockNumber(),
ParentBlockRoot: headRoot,
ParentBlockHash: headPayload.BlockHash(),
PayloadAttributesV2: &enginev1.PayloadAttributesV2{
Timestamp: uint64(t.Unix()),
PrevRandao: prevRando,
SuggestedFeeRecipient: headPayload.FeeRecipient(),
Withdrawals: withdrawals,
},
},
})
default:
return errors.New("payload version is not supported")
}
}

func streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error {
returnData, err := anypb.New(data)
if err != nil {
Expand Down
Loading

0 comments on commit b180a7d

Please sign in to comment.