diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 38492502a1f9..5158f3dbe799 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -69,6 +69,14 @@ func WithDepositCache(c cache.DepositCache) Option { } } +// WithPayloadAttestationCache for payload attestation cache. +func WithPayloadAttestationCache(c *cache.PayloadAttestationCache) Option { + return func(s *Service) error { + s.cfg.PayloadAttestationCache = c + return nil + } +} + // WithPayloadIDCache for payload ID cache. func WithPayloadIDCache(c *cache.PayloadIDCache) Option { return func(s *Service) error { diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index a17cb632c6f5..3bb20b5aa198 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -76,6 +76,7 @@ type config struct { ChainStartFetcher execution.ChainStartFetcher BeaconDB db.HeadAccessDatabase DepositCache cache.DepositCache + PayloadAttestationCache *cache.PayloadAttestationCache PayloadIDCache *cache.PayloadIDCache TrackedValidatorsCache *cache.TrackedValidatorsCache AttPool attestations.Pool diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index 8a0b9d7a99f0..df77e1230247 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "doc.go", "error.go", "interfaces.go", + "payload_attestation.go", "payload_id.go", "proposer_indices.go", "proposer_indices_disabled.go", # keep @@ -42,6 +43,7 @@ go_library( "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//container/slice:go_default_library", + "//crypto/bls:go_default_library", "//crypto/hash:go_default_library", "//crypto/rand:go_default_library", "//encoding/bytesutil:go_default_library", @@ -70,6 +72,7 @@ go_test( "checkpoint_state_test.go", "committee_fuzz_test.go", "committee_test.go", + "payload_attestation_test.go", "payload_id_test.go", "private_access_test.go", "proposer_indices_test.go", @@ -88,6 +91,7 @@ go_test( "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", + "//crypto/bls:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//testing/assert:go_default_library", diff --git a/beacon-chain/cache/payload_attestation.go b/beacon-chain/cache/payload_attestation.go new file mode 100644 index 000000000000..2a921559b2d7 --- /dev/null +++ b/beacon-chain/cache/payload_attestation.go @@ -0,0 +1,116 @@ +package cache + +import ( + "errors" + "sync" + + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/crypto/bls" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" +) + +var errNilPayloadAttestationMessage = errors.New("nil Payload Attestation Message") + +// PayloadAttestationCache keeps a map of all the PTC votes that were seen, +// already aggregated. The key is the beacon block root. +type PayloadAttestationCache struct { + root [32]byte + attestations [primitives.PAYLOAD_INVALID_STATUS]*eth.PayloadAttestation + sync.Mutex +} + +// Seen returns true if a vote for the given Beacon Block Root has already been processed +// for this Payload Timeliness Committee index. This will return true even if +// the Payload status differs. +func (p *PayloadAttestationCache) Seen(root [32]byte, idx uint64) bool { + p.Lock() + defer p.Unlock() + if p.root != root { + return false + } + for _, agg := range p.attestations { + if agg == nil { + continue + } + if agg.AggregationBits.BitAt(idx) { + return true + } + } + return false +} + +// messageToPayloadAttestation creates a PayloadAttestation with a single +// aggregated bit from the passed PayloadAttestationMessage +func messageToPayloadAttestation(att *eth.PayloadAttestationMessage, idx uint64) *eth.PayloadAttestation { + bits := primitives.NewPayloadAttestationAggregationBits() + bits.SetBitAt(idx, true) + data := ð.PayloadAttestationData{ + BeaconBlockRoot: bytesutil.SafeCopyBytes(att.Data.BeaconBlockRoot), + Slot: att.Data.Slot, + PayloadStatus: att.Data.PayloadStatus, + } + return ð.PayloadAttestation{ + AggregationBits: bits, + Data: data, + Signature: bytesutil.SafeCopyBytes(att.Signature), + } +} + +// aggregateSigFromMessage returns the aggregated signature from a Payload +// Attestation by adding the passed signature in the PayloadAttestationMessage, +// no signature validation is performed. +func aggregateSigFromMessage(aggregated *eth.PayloadAttestation, message *eth.PayloadAttestationMessage) ([]byte, error) { + aggSig, err := bls.SignatureFromBytesNoValidation(aggregated.Signature) + if err != nil { + return nil, err + } + sig, err := bls.SignatureFromBytesNoValidation(message.Signature) + if err != nil { + return nil, err + } + return bls.AggregateSignatures([]bls.Signature{aggSig, sig}).Marshal(), nil +} + +// Add adds a PayloadAttestationMessage to the internal cache of aggregated +// PayloadAttestations. +// If the index has already been seen for this attestation status the function does nothing. +// If the root is not the cached root, the function will clear the previous cache +// This function assumes that the message has already been validated. In +// particular that the signature is valid and that the block root corresponds to +// the given slot in the attestation data. +func (p *PayloadAttestationCache) Add(att *eth.PayloadAttestationMessage, idx uint64) error { + if att == nil || att.Data == nil || att.Data.BeaconBlockRoot == nil { + return errNilPayloadAttestationMessage + } + p.Lock() + defer p.Unlock() + root := [32]byte(att.Data.BeaconBlockRoot) + if p.root != root { + p.root = root + p.attestations = [primitives.PAYLOAD_INVALID_STATUS]*eth.PayloadAttestation{} + } + agg := p.attestations[att.Data.PayloadStatus] + if agg == nil { + p.attestations[att.Data.PayloadStatus] = messageToPayloadAttestation(att, idx) + return nil + } + if agg.AggregationBits.BitAt(idx) { + return nil + } + sig, err := aggregateSigFromMessage(agg, att) + if err != nil { + return err + } + agg.Signature = sig + agg.AggregationBits.SetBitAt(idx, true) + return nil +} + +// Clear clears the internal map +func (p *PayloadAttestationCache) Clear() { + p.Lock() + defer p.Unlock() + p.root = [32]byte{} + p.attestations = [primitives.PAYLOAD_INVALID_STATUS]*eth.PayloadAttestation{} +} diff --git a/beacon-chain/cache/payload_attestation_test.go b/beacon-chain/cache/payload_attestation_test.go new file mode 100644 index 000000000000..10c274222edf --- /dev/null +++ b/beacon-chain/cache/payload_attestation_test.go @@ -0,0 +1,95 @@ +package cache + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/crypto/bls" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestPayloadAttestationCache(t *testing.T) { + p := &PayloadAttestationCache{} + + //Test Has seen + root := [32]byte{'r'} + idx := uint64(5) + require.Equal(t, false, p.Seen(root, idx)) + + // Test Add + msg := ð.PayloadAttestationMessage{ + Signature: bls.NewAggregateSignature().Marshal(), + Data: ð.PayloadAttestationData{ + BeaconBlockRoot: root[:], + Slot: 1, + PayloadStatus: primitives.PAYLOAD_PRESENT, + }, + } + + // Add new root + require.NoError(t, p.Add(msg, idx)) + require.Equal(t, true, p.Seen(root, idx)) + require.Equal(t, root, p.root) + att := p.attestations[primitives.PAYLOAD_PRESENT] + indices := att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx)}, indices) + singleSig := bytesutil.SafeCopyBytes(msg.Signature) + require.DeepEqual(t, singleSig, att.Signature) + + // Test Seen + require.Equal(t, true, p.Seen(root, idx)) + require.Equal(t, false, p.Seen(root, idx+1)) + + // Add another attestation on the same data + msg2 := ð.PayloadAttestationMessage{ + Signature: bls.NewAggregateSignature().Marshal(), + Data: att.Data, + } + idx2 := uint64(7) + require.NoError(t, p.Add(msg2, idx2)) + att = p.attestations[primitives.PAYLOAD_PRESENT] + indices = att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx), int(idx2)}, indices) + require.DeepNotEqual(t, att.Signature, msg.Signature) + + // Try again the same index + require.NoError(t, p.Add(msg2, idx2)) + att2 := p.attestations[primitives.PAYLOAD_PRESENT] + indices = att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx), int(idx2)}, indices) + require.DeepEqual(t, att, att2) + + // Test Seen + require.Equal(t, true, p.Seen(root, idx2)) + require.Equal(t, false, p.Seen(root, idx2+1)) + + // Add another payload status for a different payload status + msg3 := ð.PayloadAttestationMessage{ + Signature: bls.NewAggregateSignature().Marshal(), + Data: ð.PayloadAttestationData{ + BeaconBlockRoot: root[:], + Slot: 1, + PayloadStatus: primitives.PAYLOAD_WITHHELD, + }, + } + idx3 := uint64(17) + + require.NoError(t, p.Add(msg3, idx3)) + att3 := p.attestations[primitives.PAYLOAD_WITHHELD] + indices3 := att3.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx3)}, indices3) + require.DeepEqual(t, singleSig, att3.Signature) + + // Add a different root + root2 := [32]byte{'s'} + msg.Data.BeaconBlockRoot = root2[:] + require.NoError(t, p.Add(msg, idx)) + require.Equal(t, root2, p.root) + require.Equal(t, true, p.Seen(root2, idx)) + require.Equal(t, false, p.Seen(root, idx)) + att = p.attestations[primitives.PAYLOAD_PRESENT] + indices = att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx)}, indices) +} diff --git a/beacon-chain/core/helpers/payload_attestation.go b/beacon-chain/core/helpers/payload_attestation.go index 348c419f8f6e..5f626ff01dee 100644 --- a/beacon-chain/core/helpers/payload_attestation.go +++ b/beacon-chain/core/helpers/payload_attestation.go @@ -68,6 +68,21 @@ func ValidateNilPayloadAttestation(att *eth.PayloadAttestation) error { return ValidateNilPayloadAttestationData(att.Data) } +// InPayloadTimelinessCommittee returns whether the given index belongs to the +// PTC computed from the passed state. +func InPayloadTimelinessCommittee(ctx context.Context, state state.ReadOnlyBeaconState, slot primitives.Slot, idx primitives.ValidatorIndex) (bool, error) { + ptc, err := GetPayloadTimelinessCommittee(ctx, state, slot) + if err != nil { + return false, err + } + for _, i := range ptc { + if i == idx { + return true, nil + } + } + return false, nil +} + // GetPayloadTimelinessCommittee returns the PTC for the given slot, computed from the passed state as in the // spec function `get_ptc`. func GetPayloadTimelinessCommittee(ctx context.Context, state state.ReadOnlyBeaconState, slot primitives.Slot) (indices []primitives.ValidatorIndex, err error) { @@ -246,3 +261,36 @@ func IsValidIndexedPayloadAttestation(state state.ReadOnlyBeaconState, att *epbs return signature.FastAggregateVerify(publicKeys, signingRoot), nil } + +// ValidatePayloadAttestationMessageSignature verifies the signature of a +// payload attestation message. +func ValidatePayloadAttestationMessageSignature(ctx context.Context, st state.ReadOnlyBeaconState, msg *eth.PayloadAttestationMessage) error { + if err := ValidateNilPayloadAttestationMessage(msg); err != nil { + return err + } + val, err := st.ValidatorAtIndex(msg.ValidatorIndex) + if err != nil { + return err + } + pub, err := bls.PublicKeyFromBytes(val.PublicKey) + if err != nil { + return err + } + sig, err := bls.SignatureFromBytes(msg.Signature) + if err != nil { + return err + } + currentEpoch := slots.ToEpoch(st.Slot()) + domain, err := signing.Domain(st.Fork(), currentEpoch, params.BeaconConfig().DomainPTCAttester, st.GenesisValidatorsRoot()) + if err != nil { + return err + } + root, err := signing.ComputeSigningRoot(msg.Data, domain) + if err != nil { + return err + } + if !sig.Verify(pub, root[:]) { + return signing.ErrSigFailedToVerify + } + return nil +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index d3f3af2ecfbd..e77229a90ff7 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -102,6 +102,7 @@ type BeaconNode struct { blsToExecPool blstoexec.PoolManager depositCache cache.DepositCache trackedValidatorsCache *cache.TrackedValidatorsCache + payloadAttestationCache *cache.PayloadAttestationCache payloadIDCache *cache.PayloadIDCache stateFeed *event.Feed blockFeed *event.Feed @@ -152,6 +153,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco syncCommitteePool: synccommittee.NewPool(), blsToExecPool: blstoexec.NewPool(), trackedValidatorsCache: cache.NewTrackedValidatorsCache(), + payloadAttestationCache: &cache.PayloadAttestationCache{}, payloadIDCache: cache.NewPayloadIDCache(), slasherBlockHeadersFeed: new(event.Feed), slasherAttestationsFeed: new(event.Feed), @@ -775,6 +777,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithSyncComplete(syncComplete), blockchain.WithBlobStorage(b.BlobStorage), blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), + blockchain.WithPayloadAttestationCache(b.payloadAttestationCache), blockchain.WithPayloadIDCache(b.payloadIDCache), blockchain.WithSyncChecker(b.syncChecker), ) @@ -852,6 +855,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil regularsync.WithStateGen(b.stateGen), regularsync.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), regularsync.WithSlasherBlockHeadersFeed(b.slasherBlockHeadersFeed), + regularsync.WithPayloadAttestationCache(b.payloadAttestationCache), regularsync.WithPayloadReconstructor(web3Service), regularsync.WithClockWaiter(b.clockWaiter), regularsync.WithInitialSyncComplete(initialSyncComplete), diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index cba397bcf1bf..194176f1d1f6 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "log.go", "metrics.go", "options.go", + "payload_attestations.go", "pending_attestations_queue.go", "pending_blocks_queue.go", "rate_limiter.go", @@ -101,6 +102,7 @@ go_library( "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", + "//consensus-types/epbs/payload-attestation:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/wrapper:go_default_library", @@ -155,6 +157,7 @@ go_test( "decode_pubsub_test.go", "error_test.go", "fork_watcher_test.go", + "payload_attestations_test.go", "pending_attestations_queue_test.go", "pending_blocks_queue_test.go", "rate_limiter_test.go", @@ -228,6 +231,7 @@ go_test( "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", + "//consensus-types/epbs/payload-attestation:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/wrapper:go_default_library", @@ -246,6 +250,7 @@ go_test( "//testing/assert:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", + "//testing/util/random:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", "@com_github_d4l3k_messagediff//:go_default_library", diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index 9b0281ea667f..30513eb662dd 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -2,6 +2,7 @@ package sync import ( "github.com/prysmaticlabs/prysm/v5/async/event" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" @@ -127,6 +128,13 @@ func WithSlasherBlockHeadersFeed(slasherBlockHeadersFeed *event.Feed) Option { } } +func WithPayloadAttestationCache(r *cache.PayloadAttestationCache) Option { + return func(s *Service) error { + s.payloadAttestationCache = r + return nil + } +} + func WithPayloadReconstructor(r execution.PayloadReconstructor) Option { return func(s *Service) error { s.cfg.executionPayloadReconstructor = r diff --git a/beacon-chain/sync/payload_attestations.go b/beacon-chain/sync/payload_attestations.go new file mode 100644 index 000000000000..bcfb860a8bb8 --- /dev/null +++ b/beacon-chain/sync/payload_attestations.go @@ -0,0 +1,115 @@ +package sync + +import ( + "context" + "slices" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" + payloadattestation "github.com/prysmaticlabs/prysm/v5/consensus-types/epbs/payload-attestation" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "go.opencensus.io/trace" + "google.golang.org/protobuf/proto" +) + +var ( + errInvalidValidatorIndex = errors.New("invalid validator index") + errAlreadySeenPayloadAttestation = errors.New("payload attestation already seen for validator index") +) + +func (s *Service) validatePayloadAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { + if pid == s.cfg.p2p.PeerID() { + return pubsub.ValidationAccept, nil + } + if s.cfg.initialSync.Syncing() { + return pubsub.ValidationIgnore, nil + } + ctx, span := trace.StartSpan(ctx, "sync.validatePayloadAttestation") + defer span.End() + if msg.Topic == nil { + return pubsub.ValidationReject, errInvalidTopic + } + m, err := s.decodePubsubMessage(msg) + if err != nil { + tracing.AnnotateError(span, err) + return pubsub.ValidationReject, err + } + att, ok := m.(*eth.PayloadAttestationMessage) + if !ok { + return pubsub.ValidationReject, errWrongMessage + } + pa, err := payloadattestation.NewReadOnly(att) + if err != nil { + log.WithError(err).Error("failed to create read only payload attestation") + return pubsub.ValidationIgnore, err + } + v := s.newPayloadAttestationVerifier(pa, verification.GossipPayloadAttestationMessageRequirements) + + if err := v.VerifyCurrentSlot(); err != nil { + return pubsub.ValidationIgnore, err + } + + if err := v.VerifyPayloadStatus(); err != nil { + return pubsub.ValidationReject, err + } + + if err := v.VerifyBlockRootSeen(s.hasBadBlock); err != nil { + return pubsub.ValidationIgnore, err + } + + if err := v.VerifyBlockRootValid(s.hasBadBlock); err != nil { + return pubsub.ValidationReject, err + } + + st, err := s.cfg.chain.HeadState(ctx) + if err != nil { + return pubsub.ValidationIgnore, err + } + + if err := v.VerifyValidatorInPTC(ctx, st); err != nil { + return pubsub.ValidationReject, err + } + + if err := v.VerifySignature(st); err != nil { + return pubsub.ValidationReject, err + } + + if s.payloadAttestationCache.Seen(pa.BeaconBlockRoot(), uint64(pa.ValidatorIndex())) { + return pubsub.ValidationIgnore, errAlreadySeenPayloadAttestation + } + + return pubsub.ValidationAccept, nil +} + +func (s *Service) payloadAttestationSubscriber(ctx context.Context, msg proto.Message) error { + a, ok := msg.(*eth.PayloadAttestationMessage) + if !ok { + return errWrongMessage + } + if err := helpers.ValidateNilPayloadAttestationMessage(a); err != nil { + return err + } + root := [32]byte(a.Data.BeaconBlockRoot) + st, err := s.cfg.chain.HeadState(ctx) + if err != nil { + return err + } + ptc, err := helpers.GetPayloadTimelinessCommittee(ctx, st, a.Data.Slot) + if err != nil { + return err + } + idx := slices.Index(ptc, a.ValidatorIndex) + if idx == -1 { + return errInvalidValidatorIndex + } + if s.payloadAttestationCache.Seen(root, uint64(primitives.ValidatorIndex(idx))) { + return nil + } + + return s.payloadAttestationCache.Add(a, uint64(idx)) +} diff --git a/beacon-chain/sync/payload_attestations_test.go b/beacon-chain/sync/payload_attestations_test.go new file mode 100644 index 000000000000..1a3f52538eda --- /dev/null +++ b/beacon-chain/sync/payload_attestations_test.go @@ -0,0 +1,162 @@ +package sync + +import ( + "bytes" + "context" + "reflect" + "testing" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/pkg/errors" + mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" + p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" + mockSync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/initial-sync/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v5/config/params" + payloadattestation "github.com/prysmaticlabs/prysm/v5/consensus-types/epbs/payload-attestation" + "github.com/prysmaticlabs/prysm/v5/testing/require" + "github.com/prysmaticlabs/prysm/v5/testing/util/random" +) + +func TestValidatePayloadAttestationMessage_IncorrectTopic(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)} + s := &Service{ + payloadAttestationCache: &cache.PayloadAttestationCache{}, + cfg: &config{chain: chainService, p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + msg := random.PayloadAttestation(t) // Using payload attestation for message should fail. + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, msg) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestToTopic(topic, digest) + + result, err := s.validatePayloadAttestation(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorContains(t, "extraction failed for topic", err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidatePayloadAttestationMessage_ErrorPathsWithMock(t *testing.T) { + tests := []struct { + error error + verifier verification.NewPayloadAttestationMsgVerifier + result pubsub.ValidationResult + }{ + { + error: errors.New("incorrect slot"), + verifier: func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{ErrIncorrectPayloadAttSlot: errors.New("incorrect slot")} + }, + result: pubsub.ValidationIgnore, + }, + { + error: errors.New("incorrect status"), + verifier: func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{ErrIncorrectPayloadAttStatus: errors.New("incorrect status")} + }, + result: pubsub.ValidationReject, + }, + { + error: errors.New("block root seen"), + verifier: func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{ErrPayloadAttBlockRootNotSeen: errors.New("block root seen")} + }, + result: pubsub.ValidationIgnore, + }, + { + error: errors.New("block root invalid"), + verifier: func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{ErrPayloadAttBlockRootInvalid: errors.New("block root invalid")} + }, + result: pubsub.ValidationReject, + }, + { + error: errors.New("validator not in PTC"), + verifier: func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{ErrIncorrectPayloadAttValidator: errors.New("validator not in PTC")} + }, + result: pubsub.ValidationReject, + }, + { + error: errors.New("incorrect signature"), + verifier: func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{ErrInvalidMessageSignature: errors.New("incorrect signature")} + }, + result: pubsub.ValidationReject, + }, + } + for _, tt := range tests { + t.Run(tt.error.Error(), func(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)} + s := &Service{ + payloadAttestationCache: &cache.PayloadAttestationCache{}, + cfg: &config{chain: chainService, p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + s.newPayloadAttestationVerifier = tt.verifier + + msg := random.PayloadAttestationMessage(t) + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, msg) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestToTopic(topic, digest) + + result, err := s.validatePayloadAttestation(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + + require.ErrorContains(t, tt.error.Error(), err) + require.Equal(t, result, tt.result) + }) + } +} + +func TestValidatePayloadAttestationMessage_Accept(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)} + s := &Service{ + payloadAttestationCache: &cache.PayloadAttestationCache{}, + cfg: &config{chain: chainService, p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + s.newPayloadAttestationVerifier = func(pa payloadattestation.ROMessage, reqs []verification.Requirement) verification.PayloadAttestationMsgVerifier { + return &verification.MockPayloadAttestation{} + } + + msg := random.PayloadAttestationMessage(t) + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, msg) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestToTopic(topic, digest) + + result, err := s.validatePayloadAttestation(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationAccept) +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 15196bf6ca74..2bc834d856f8 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/async/abool" "github.com/prysmaticlabs/prysm/v5/async/event" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" @@ -124,6 +125,7 @@ type Service struct { seenPendingBlocks map[[32]byte]bool blkRootToPendingAtts map[[32]byte][]ethpb.SignedAggregateAttAndProof subHandler *subTopicHandler + payloadAttestationCache *cache.PayloadAttestationCache pendingAttsLock sync.RWMutex pendingQueueLock sync.RWMutex chainStarted *abool.AtomicBool @@ -156,6 +158,7 @@ type Service struct { initialSyncComplete chan struct{} verifierWaiter *verification.InitializerWaiter newBlobVerifier verification.NewBlobVerifier + newPayloadAttestationVerifier verification.NewPayloadAttestationMsgVerifier availableBlocker coverage.AvailableBlocker ctxMap ContextByteVersions } diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 3b7f7def672c..fefe3ef59688 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -145,6 +145,16 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { params.BeaconConfig().BlobsidecarSubnetCount, ) } + + // New Gossip Topic for ePBS + if epoch >= params.BeaconConfig().EPBSForkEpoch { + s.subscribe( + p2p.PayloadAttestationMessageTopicFormat, + s.validatePayloadAttestation, + s.payloadAttestationSubscriber, + digest, + ) + } } // subscribe to a given topic with a given validator and subscription handler. diff --git a/beacon-chain/verification/interface.go b/beacon-chain/verification/interface.go index e87a5c55bbf8..17adf92b8ca6 100644 --- a/beacon-chain/verification/interface.go +++ b/beacon-chain/verification/interface.go @@ -44,3 +44,7 @@ type PayloadAttestationMsgVerifier interface { // NewBlobVerifier is a function signature that can be used by code that needs to be // able to mock Initializer.NewBlobVerifier without complex setup. type NewBlobVerifier func(b blocks.ROBlob, reqs []Requirement) BlobVerifier + +// NewPayloadAttestationMsgVerifier is a function signature that can be used by code that needs to be +// able to mock Initializer.NewPayloadAttestationMsgVerifier without complex setup. +type NewPayloadAttestationMsgVerifier func(pa payloadattestation.ROMessage, reqs []Requirement) PayloadAttestationMsgVerifier diff --git a/consensus-types/primitives/BUILD.bazel b/consensus-types/primitives/BUILD.bazel index 1ec9295ce799..ee0ca128a436 100644 --- a/consensus-types/primitives/BUILD.bazel +++ b/consensus-types/primitives/BUILD.bazel @@ -10,6 +10,8 @@ go_library( "epoch.go", "execution_address.go", "kzg.go", + "payload_attestations_mainnet.go", + "payload_attestations_minimal.go", #keep "payload_id.go", "ptc_status.go", "randao.go", diff --git a/consensus-types/primitives/payload_attestations_mainnet.go b/consensus-types/primitives/payload_attestations_mainnet.go new file mode 100644 index 000000000000..badba898e724 --- /dev/null +++ b/consensus-types/primitives/payload_attestations_mainnet.go @@ -0,0 +1,9 @@ +//go:build !minimal + +package primitives + +import bitfield "github.com/prysmaticlabs/go-bitfield" + +func NewPayloadAttestationAggregationBits() bitfield.Bitvector512 { + return bitfield.NewBitvector512() +} diff --git a/consensus-types/primitives/payload_attestations_minimal.go b/consensus-types/primitives/payload_attestations_minimal.go new file mode 100644 index 000000000000..3174669b087e --- /dev/null +++ b/consensus-types/primitives/payload_attestations_minimal.go @@ -0,0 +1,9 @@ +//go:build minimal + +package primitives + +import bitfield "github.com/prysmaticlabs/go-bitfield" + +func NewPayloadAttestationAggregationBits() bitfield.Bitvector32 { + return bitfield.NewBitvector32() +}