Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prysm V4: SSE api adding payload_attributes #12102

Merged
merged 38 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
bd4d7b1
wip implementation
james-prysm Mar 8, 2023
9a72e84
wip implementation
james-prysm Mar 8, 2023
2132914
WIP change
james-prysm Mar 8, 2023
07e5870
wip proto
james-prysm Mar 8, 2023
869628b
wip proto
james-prysm Mar 8, 2023
c6d266a
added changes for events to notify Payload changes
james-prysm Mar 8, 2023
f523c7c
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 8, 2023
cff5188
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 9, 2023
78993b8
updating event payload proto type
james-prysm Mar 10, 2023
09c82ca
updating events proto
james-prysm Mar 10, 2023
e1db823
fixing events type to oneof
james-prysm Mar 10, 2023
df8f7d4
fixing broken payload
james-prysm Mar 10, 2023
a2e5e52
reverting back to using different event messages
james-prysm Mar 10, 2023
16fa737
updating event logic to handle different payload types
james-prysm Mar 11, 2023
16b0aa8
wip edits
james-prysm Mar 13, 2023
9150726
wip edits
james-prysm Mar 13, 2023
1879d68
removing unused function
james-prysm Mar 13, 2023
58fd783
adding terence's suggestions
james-prysm Mar 14, 2023
1a4d8a0
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 14, 2023
ed02d0c
reverting some changes
james-prysm Mar 14, 2023
81d4b80
reverting more logic
james-prysm Mar 14, 2023
94a591d
reverting addition of line
james-prysm Mar 14, 2023
a7a0c77
fixing linting
james-prysm Mar 14, 2023
2ec7bf5
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 14, 2023
71f6804
trying to fix linter
james-prysm Mar 14, 2023
8a732a3
fixing log
james-prysm Mar 14, 2023
4f02831
updating log recommendations
james-prysm Mar 14, 2023
f3afdb9
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 14, 2023
90b2b15
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 14, 2023
60d6c98
wip unit tests
james-prysm Mar 14, 2023
74f20eb
WIP fixes on review
james-prysm Mar 14, 2023
c18d773
updating generated file
james-prysm Mar 14, 2023
9e023d6
WIP unit test changes
james-prysm Mar 14, 2023
32e60ec
adding unit tests for stream
james-prysm Mar 14, 2023
9c1fa07
updating items based on review
james-prysm Mar 14, 2023
c1ac157
fixing linting
james-prysm Mar 14, 2023
346ba0f
removing extra line
james-prysm Mar 14, 2023
2fb5c59
Merge branch 'develop' into payload-attribute-events-api
james-prysm Mar 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions beacon-chain/blockchain/execution_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *notifyForkcho

nextSlot := s.CurrentSlot() + 1 // Cache payload ID for next slot proposer.
hasAttr, attr, proposerId := s.getPayloadAttribute(ctx, arg.headState, nextSlot)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove these lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add it back in, was reverting some changes removed extra lines

payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, attr)
if err != nil {
switch err {
Expand Down Expand Up @@ -257,7 +256,6 @@ func (s *Service) getPayloadAttribute(ctx context.Context, st state.BeaconState,
if !ok { // There's no need to build attribute if there is no proposer for slot.
return false, emptyAttri, 0
}

// Get previous randao.
st = st.Copy()
st, err := transition.ProcessSlotsIfPossible(ctx, st, slot)
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/apimiddleware/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/eth/v2:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_r3labs_sse//:go_default_library",
Expand Down
19 changes: 19 additions & 0 deletions beacon-chain/rpc/apimiddleware/custom_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v3/api/gateway/apimiddleware"
"github.com/prysmaticlabs/prysm/v3/api/grpc"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/eth/events"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/r3labs/sse"
)

Expand Down Expand Up @@ -364,6 +366,10 @@ func handleEvents(m *apimiddleware.ApiProxyMiddleware, _ apimiddleware.Endpoint,
return true
}

type dataSubset struct {
Version string `json:"version"`
}

func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http.Request) apimiddleware.ErrorJson {
for {
select {
Expand Down Expand Up @@ -418,6 +424,19 @@ func receiveEvents(eventChan <-chan *sse.Event, w http.ResponseWriter, req *http
data = &SignedContributionAndProofJson{}
case events.BLSToExecutionChangeTopic:
data = &SignedBLSToExecutionChangeJson{}
case events.PayloadAttributesTopic:
dataSubset := &dataSubset{}
if err := json.Unmarshal(msg.Data, dataSubset); err != nil {
return apimiddleware.InternalServerError(err)
}
switch dataSubset.Version {
case version.String(version.Capella):
data = &EventPayloadAttributeStreamV2Json{}
case version.String(version.Bellatrix):
data = &EventPayloadAttributeStreamV1Json{}
default:
return apimiddleware.InternalServerError(errors.New("payload version unsupported"))
}
case "error":
data = &EventErrorJson{}
default:
Expand Down
41 changes: 41 additions & 0 deletions beacon-chain/rpc/apimiddleware/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,47 @@ type EventChainReorgJson struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
}

type EventPayloadAttributeStreamV1Json struct {
Version string `json:"version"`
Data *EventPayloadAttributeV1Json
}

type EventPayloadAttributeStreamV2Json struct {
Version string `json:"version"`
Data *EventPayloadAttributeV2Json
}

type EventPayloadAttributeV1Json struct {
ProposerIndex string `json:"proposer_index"`
ProposalSlot string `json:"proposal_slot"`
ParentBlockNumber string `json:"parent_block_number"`
ParentBlockRoot string `json:"parent_block_root" hex:"true"`
ParentBlockHash string `json:"parent_block_hash" hex:"true"`
PayloadAttributes *PayloadAttributesV1Json `json:"payload_attributes"`
}

type EventPayloadAttributeV2Json struct {
ProposerIndex string `json:"proposer_index"`
ProposalSlot string `json:"proposal_slot"`
ParentBlockNumber string `json:"parent_block_number"`
ParentBlockRoot string `json:"parent_block_root" hex:"true"`
ParentBlockHash string `json:"parent_block_hash" hex:"true"`
PayloadAttributes *PayloadAttributesV2Json `json:"payload_attributes_v2"`
}

type PayloadAttributesV1Json struct {
Timestamp string `json:"timestamp"`
Random string `json:"prev_randao" hex:"true"`
SuggestedFeeRecipient string `json:"suggested_fee_recipient" hex:"true"`
}

type PayloadAttributesV2Json struct {
Timestamp string `json:"timestamp"`
Random string `json:"prev_randao" hex:"true"`
SuggestedFeeRecipient string `json:"suggested_fee_recipient" hex:"true"`
Withdrawals []*WithdrawalJson `json:"withdrawals"`
}

// ---------------
// Error handling.
// ---------------
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v3/beacon-chain/rpc/eth/events",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/service:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/migration:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//proto/gateway:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
Expand Down
107 changes: 100 additions & 7 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ import (
blockfeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/time"
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1"
ethpbservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
"github.com/prysmaticlabs/prysm/v3/proto/migration"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"github.com/prysmaticlabs/prysm/v3/time/slots"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand All @@ -35,6 +41,8 @@ const (
SyncCommitteeContributionTopic = "contribution_and_proof"
// BLSToExecutionChangeTopic represents a new received BLS to execution change event topic.
BLSToExecutionChangeTopic = "bls_to_execution_change"
// PayloadAttributesTopic represents a new payload attributes for execution payload building event topic.
PayloadAttributesTopic = "payload_attributes"
)

var casesHandled = map[string]bool{
Expand All @@ -46,6 +54,7 @@ var casesHandled = map[string]bool{
ChainReorgTopic: true,
SyncCommitteeContributionTopic: true,
BLSToExecutionChangeTopic: true,
PayloadAttributesTopic: true,
}

// StreamEvents allows requesting all events from a set of topics defined in the Ethereum consensus API standard.
Expand Down Expand Up @@ -95,7 +104,7 @@ func (s *Server) StreamEvents(
return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err)
}
case event := <-stateChan:
if err := handleStateEvents(stream, requestedTopics, event); err != nil {
if err := s.handleStateEvents(stream, requestedTopics, event); err != nil {
return status.Errorf(codes.Internal, "Could not handle state event: %v", err)
}
case <-s.Ctx.Done():
Expand Down Expand Up @@ -191,24 +200,31 @@ func handleBlockOperationEvents(
}
v2Change := migration.V1Alpha1SignedBLSToExecChangeToV2(changeData.Change)
return streamData(stream, BLSToExecutionChangeTopic, v2Change)

default:
return nil
}
}

func handleStateEvents(
func (s *Server) handleStateEvents(
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event,
) error {
switch event.Type {
case statefeed.NewHead:
if _, ok := requestedTopics[HeadTopic]; !ok {
return nil
if _, ok := requestedTopics[HeadTopic]; ok {
head, ok := event.Data.(*ethpb.EventHead)
if !ok {
return nil
}
return streamData(stream, HeadTopic, head)
}
head, ok := event.Data.(*ethpb.EventHead)
if !ok {
if _, ok := requestedTopics[PayloadAttributesTopic]; ok {
if err := s.streamPayloadAttributes(stream); err != nil {
log.WithError(err).Error("Unable to obtain stream payload attributes")
}
return nil
}
return streamData(stream, HeadTopic, head)
return nil
case statefeed.FinalizedCheckpoint:
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok {
return nil
Expand All @@ -232,6 +248,83 @@ func handleStateEvents(
}
}

// notifyPayloadAttributesStream on successful FCU notify the event stream that a payload was sent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to update this comment

func (s *Server) streamPayloadAttributes(stream ethpbservice.Events_StreamEventsServer) error {
headState, err := s.HeadFetcher.HeadStateReadOnly(s.Ctx)
if err != nil {
return err
}

headBlock, err := s.HeadFetcher.HeadBlock(s.Ctx)
if err != nil {
return err
}

headRoot, err := s.HeadFetcher.HeadRoot(s.Ctx)
if err != nil {

return err
}

headPayload, err := headBlock.Block().Body().Execution()
if err != nil {

return err
}

t, err := slots.ToTime(uint64(headState.GenesisTime()), headState.Slot())
if err != nil {
return err
}

prevRando, err := helpers.RandaoMix(headState, time.CurrentEpoch(headState))
if err != nil {
return err
}

switch headState.Version() {
case version.Bellatrix:
return streamData(stream, PayloadAttributesTopic, &ethpb.EventPayloadAttributeV1{
Version: version.String(headState.Version()),
Data: &ethpb.EventPayloadAttributeV1_BasePayloadAttribute{
ProposerIndex: headBlock.Block().ProposerIndex(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be the proposer index of headState.slot + 1. We want to advance the state by 1 and use BeaconProposerIndex

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the API again. This is what it says

 - `proposal_slot`: the slot at which a block using these payload attributes may be built.

Based on that, I'm taking it as headState.slot. Your current implementation is right

ProposalSlot: headState.Slot(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this as s.CurrentSlot +1 on others but since the headState is saved in this case i think it's headState.Slot()

Copy link
Member

@terencechain terencechain Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headState.Slot() feels a little useless to me. I think it should be +1 but best to check with others

EDIT: this is wrong

terencechain marked this conversation as resolved.
Show resolved Hide resolved
ParentBlockNumber: headBlock.Block().Slot(),
ParentBlockRoot: headRoot,
ParentBlockHash: headPayload.BlockHash(),
PayloadAttributes: &enginev1.PayloadAttributes{
Timestamp: uint64(t.Unix()),
PrevRandao: prevRando,
SuggestedFeeRecipient: headPayload.FeeRecipient(),
},
},
})
case version.Capella:
withdrawals, err := headState.ExpectedWithdrawals()
if err != nil {
return err
}
return streamData(stream, PayloadAttributesTopic, &ethpb.EventPayloadAttributeV2{
Version: version.String(headState.Version()),
Data: &ethpb.EventPayloadAttributeV2_BasePayloadAttribute{
ProposerIndex: headBlock.Block().ProposerIndex(),
ProposalSlot: headState.Slot(),
ParentBlockNumber: headBlock.Block().Slot(),
ParentBlockRoot: headRoot,
ParentBlockHash: headPayload.BlockHash(),
PayloadAttributesV2: &enginev1.PayloadAttributesV2{
Timestamp: uint64(t.Unix()),
PrevRandao: prevRando,
SuggestedFeeRecipient: headPayload.FeeRecipient(),
Withdrawals: withdrawals,
},
},
})
default:
return errors.New("payload version is not supported")
}
}

func streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error {
returnData, err := anypb.New(data)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/eth/events/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package events
import (
"context"

"github.com/prysmaticlabs/prysm/v3/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/block"
opfeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state"
Expand All @@ -18,4 +19,5 @@ type Server struct {
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
OperationNotifier opfeed.Notifier
HeadFetcher blockchain.HeadFetcher
}
1 change: 1 addition & 0 deletions beacon-chain/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func (s *Service) Start() {
StateNotifier: s.cfg.StateNotifier,
BlockNotifier: s.cfg.BlockNotifier,
OperationNotifier: s.cfg.OperationNotifier,
HeadFetcher: s.cfg.HeadFetcher,
})
if s.cfg.EnableDebugRPCEndpoints {
log.Info("Enabled debug gRPC endpoints")
Expand Down
1 change: 1 addition & 0 deletions cmd/beacon-chain/flags/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var (
Name: "disable-grpc-gateway",
Usage: "Disable the gRPC gateway for JSON-HTTP requests",
}

// GRPCGatewayHost specifies a gRPC gateway host for Prysm.
GRPCGatewayHost = &cli.StringFlag{
Name: "grpc-gateway-host",
Expand Down
4 changes: 4 additions & 0 deletions proto/eth/v1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"//proto/eth/ext:proto",
"//proto/engine/v1:proto",
"@com_google_protobuf//:descriptor_proto",
"@com_google_protobuf//:timestamp_proto",
],
Expand All @@ -36,6 +37,7 @@ ssz_gen_marshal(
go_proto = ":go_proto",
includes = [
"//consensus-types/primitives:go_default_library",
"//proto/engine/v1:go_default_library",
],
objs = [
"AggregateAttestationAndProof",
Expand Down Expand Up @@ -70,10 +72,12 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//proto/eth/ext:go_default_library",
"//proto/engine/v1:go_default_library",
"@io_bazel_rules_go//proto/wkt:descriptor_go_proto",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library", # keep
"//consensus-types/primitives:go_default_library",
"@io_bazel_rules_go//proto/wkt:timestamp_go_proto",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
],
)
Expand Down
Loading