-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prysm V4: SSE api adding payload_attributes #12102
Changes from all commits
bd4d7b1
9a72e84
2132914
07e5870
869628b
c6d266a
f523c7c
cff5188
78993b8
09c82ca
e1db823
df8f7d4
a2e5e52
16fa737
16b0aa8
9150726
1879d68
58fd783
1a4d8a0
ed02d0c
81d4b80
94a591d
a7a0c77
2ec7bf5
71f6804
8a732a3
4f02831
f3afdb9
90b2b15
60d6c98
74f20eb
c18d773
9e023d6
32e60ec
9c1fa07
c1ac157
346ba0f
2fb5c59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,9 +9,15 @@ import ( | |
blockfeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/block" | ||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/operation" | ||
statefeed "github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed/state" | ||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers" | ||
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/time" | ||
enginev1 "github.com/prysmaticlabs/prysm/v3/proto/engine/v1" | ||
ethpbservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service" | ||
ethpb "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" | ||
"github.com/prysmaticlabs/prysm/v3/proto/migration" | ||
"github.com/prysmaticlabs/prysm/v3/runtime/version" | ||
"github.com/prysmaticlabs/prysm/v3/time/slots" | ||
log "github.com/sirupsen/logrus" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
"google.golang.org/protobuf/proto" | ||
|
@@ -35,6 +41,8 @@ const ( | |
SyncCommitteeContributionTopic = "contribution_and_proof" | ||
// BLSToExecutionChangeTopic represents a new received BLS to execution change event topic. | ||
BLSToExecutionChangeTopic = "bls_to_execution_change" | ||
// PayloadAttributesTopic represents a new payload attributes for execution payload building event topic. | ||
PayloadAttributesTopic = "payload_attributes" | ||
) | ||
|
||
var casesHandled = map[string]bool{ | ||
|
@@ -46,6 +54,7 @@ var casesHandled = map[string]bool{ | |
ChainReorgTopic: true, | ||
SyncCommitteeContributionTopic: true, | ||
BLSToExecutionChangeTopic: true, | ||
PayloadAttributesTopic: true, | ||
} | ||
|
||
// StreamEvents allows requesting all events from a set of topics defined in the Ethereum consensus API standard. | ||
|
@@ -95,7 +104,7 @@ func (s *Server) StreamEvents( | |
return status.Errorf(codes.Internal, "Could not handle block operations event: %v", err) | ||
} | ||
case event := <-stateChan: | ||
if err := handleStateEvents(stream, requestedTopics, event); err != nil { | ||
if err := s.handleStateEvents(stream, requestedTopics, event); err != nil { | ||
return status.Errorf(codes.Internal, "Could not handle state event: %v", err) | ||
} | ||
case <-s.Ctx.Done(): | ||
|
@@ -191,24 +200,31 @@ func handleBlockOperationEvents( | |
} | ||
v2Change := migration.V1Alpha1SignedBLSToExecChangeToV2(changeData.Change) | ||
return streamData(stream, BLSToExecutionChangeTopic, v2Change) | ||
|
||
default: | ||
return nil | ||
} | ||
} | ||
|
||
func handleStateEvents( | ||
func (s *Server) handleStateEvents( | ||
stream ethpbservice.Events_StreamEventsServer, requestedTopics map[string]bool, event *feed.Event, | ||
) error { | ||
switch event.Type { | ||
case statefeed.NewHead: | ||
if _, ok := requestedTopics[HeadTopic]; !ok { | ||
return nil | ||
if _, ok := requestedTopics[HeadTopic]; ok { | ||
head, ok := event.Data.(*ethpb.EventHead) | ||
if !ok { | ||
return nil | ||
} | ||
return streamData(stream, HeadTopic, head) | ||
} | ||
head, ok := event.Data.(*ethpb.EventHead) | ||
if !ok { | ||
if _, ok := requestedTopics[PayloadAttributesTopic]; ok { | ||
if err := s.streamPayloadAttributes(stream); err != nil { | ||
log.WithError(err).Error("Unable to obtain stream payload attributes") | ||
} | ||
return nil | ||
} | ||
return streamData(stream, HeadTopic, head) | ||
return nil | ||
case statefeed.FinalizedCheckpoint: | ||
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok { | ||
return nil | ||
|
@@ -232,6 +248,82 @@ func handleStateEvents( | |
} | ||
} | ||
|
||
// streamPayloadAttributes on new head event. | ||
// This event stream is intended to be used by builders and relays. | ||
func (s *Server) streamPayloadAttributes(stream ethpbservice.Events_StreamEventsServer) error { | ||
headState, err := s.HeadFetcher.HeadStateReadOnly(s.Ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
headBlock, err := s.HeadFetcher.HeadBlock(s.Ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
headRoot, err := s.HeadFetcher.HeadRoot(s.Ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
headPayload, err := headBlock.Block().Body().Execution() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
t, err := slots.ToTime(uint64(headState.GenesisTime()), headState.Slot()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
prevRando, err := helpers.RandaoMix(headState, time.CurrentEpoch(headState)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch headState.Version() { | ||
case version.Bellatrix: | ||
return streamData(stream, PayloadAttributesTopic, ðpb.EventPayloadAttributeV1{ | ||
Version: version.String(headState.Version()), | ||
Data: ðpb.EventPayloadAttributeV1_BasePayloadAttribute{ | ||
ProposerIndex: headBlock.Block().ProposerIndex(), | ||
ProposalSlot: headState.Slot(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've seen this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
EDIT: this is wrong
terencechain marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ParentBlockNumber: headPayload.BlockNumber(), | ||
ParentBlockRoot: headRoot, | ||
ParentBlockHash: headPayload.BlockHash(), | ||
PayloadAttributes: &enginev1.PayloadAttributes{ | ||
Timestamp: uint64(t.Unix()), | ||
PrevRandao: prevRando, | ||
SuggestedFeeRecipient: headPayload.FeeRecipient(), | ||
}, | ||
}, | ||
}) | ||
case version.Capella: | ||
withdrawals, err := headState.ExpectedWithdrawals() | ||
if err != nil { | ||
return err | ||
} | ||
return streamData(stream, PayloadAttributesTopic, ðpb.EventPayloadAttributeV2{ | ||
Version: version.String(headState.Version()), | ||
Data: ðpb.EventPayloadAttributeV2_BasePayloadAttribute{ | ||
ProposerIndex: headBlock.Block().ProposerIndex(), | ||
ProposalSlot: headState.Slot(), | ||
ParentBlockNumber: headPayload.BlockNumber(), | ||
ParentBlockRoot: headRoot, | ||
ParentBlockHash: headPayload.BlockHash(), | ||
PayloadAttributesV2: &enginev1.PayloadAttributesV2{ | ||
Timestamp: uint64(t.Unix()), | ||
PrevRandao: prevRando, | ||
SuggestedFeeRecipient: headPayload.FeeRecipient(), | ||
Withdrawals: withdrawals, | ||
}, | ||
}, | ||
}) | ||
default: | ||
return errors.New("payload version is not supported") | ||
} | ||
} | ||
|
||
func streamData(stream ethpbservice.Events_StreamEventsServer, name string, data proto.Message) error { | ||
returnData, err := anypb.New(data) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be the proposer index of
headState.slot + 1
. We want to advance the state by 1 and useBeaconProposerIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the API again. This is what it says
Based on that, I'm taking it as
headState.slot
. Your current implementation is right