diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 38492502a1f9..5158f3dbe799 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -69,6 +69,14 @@ func WithDepositCache(c cache.DepositCache) Option { } } +// WithPayloadAttestationCache for payload attestation cache. +func WithPayloadAttestationCache(c *cache.PayloadAttestationCache) Option { + return func(s *Service) error { + s.cfg.PayloadAttestationCache = c + return nil + } +} + // WithPayloadIDCache for payload ID cache. func WithPayloadIDCache(c *cache.PayloadIDCache) Option { return func(s *Service) error { diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index efbc70c7941f..9dcd9b1abe38 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -75,6 +75,7 @@ type config struct { ChainStartFetcher execution.ChainStartFetcher BeaconDB db.HeadAccessDatabase DepositCache cache.DepositCache + PayloadAttestationCache *cache.PayloadAttestationCache PayloadIDCache *cache.PayloadIDCache TrackedValidatorsCache *cache.TrackedValidatorsCache AttPool attestations.Pool diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index 8a0b9d7a99f0..df77e1230247 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "doc.go", "error.go", "interfaces.go", + "payload_attestation.go", "payload_id.go", "proposer_indices.go", "proposer_indices_disabled.go", # keep @@ -42,6 +43,7 @@ go_library( "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//container/slice:go_default_library", + "//crypto/bls:go_default_library", "//crypto/hash:go_default_library", "//crypto/rand:go_default_library", "//encoding/bytesutil:go_default_library", @@ -70,6 +72,7 @@ go_test( "checkpoint_state_test.go", "committee_fuzz_test.go", "committee_test.go", + "payload_attestation_test.go", "payload_id_test.go", "private_access_test.go", "proposer_indices_test.go", @@ -88,6 +91,7 @@ go_test( "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", + "//crypto/bls:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//testing/assert:go_default_library", diff --git a/beacon-chain/cache/payload_attestation.go b/beacon-chain/cache/payload_attestation.go new file mode 100644 index 000000000000..2a921559b2d7 --- /dev/null +++ b/beacon-chain/cache/payload_attestation.go @@ -0,0 +1,116 @@ +package cache + +import ( + "errors" + "sync" + + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/crypto/bls" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" +) + +var errNilPayloadAttestationMessage = errors.New("nil Payload Attestation Message") + +// PayloadAttestationCache keeps a map of all the PTC votes that were seen, +// already aggregated. The key is the beacon block root. +type PayloadAttestationCache struct { + root [32]byte + attestations [primitives.PAYLOAD_INVALID_STATUS]*eth.PayloadAttestation + sync.Mutex +} + +// Seen returns true if a vote for the given Beacon Block Root has already been processed +// for this Payload Timeliness Committee index. This will return true even if +// the Payload status differs. +func (p *PayloadAttestationCache) Seen(root [32]byte, idx uint64) bool { + p.Lock() + defer p.Unlock() + if p.root != root { + return false + } + for _, agg := range p.attestations { + if agg == nil { + continue + } + if agg.AggregationBits.BitAt(idx) { + return true + } + } + return false +} + +// messageToPayloadAttestation creates a PayloadAttestation with a single +// aggregated bit from the passed PayloadAttestationMessage +func messageToPayloadAttestation(att *eth.PayloadAttestationMessage, idx uint64) *eth.PayloadAttestation { + bits := primitives.NewPayloadAttestationAggregationBits() + bits.SetBitAt(idx, true) + data := ð.PayloadAttestationData{ + BeaconBlockRoot: bytesutil.SafeCopyBytes(att.Data.BeaconBlockRoot), + Slot: att.Data.Slot, + PayloadStatus: att.Data.PayloadStatus, + } + return ð.PayloadAttestation{ + AggregationBits: bits, + Data: data, + Signature: bytesutil.SafeCopyBytes(att.Signature), + } +} + +// aggregateSigFromMessage returns the aggregated signature from a Payload +// Attestation by adding the passed signature in the PayloadAttestationMessage, +// no signature validation is performed. +func aggregateSigFromMessage(aggregated *eth.PayloadAttestation, message *eth.PayloadAttestationMessage) ([]byte, error) { + aggSig, err := bls.SignatureFromBytesNoValidation(aggregated.Signature) + if err != nil { + return nil, err + } + sig, err := bls.SignatureFromBytesNoValidation(message.Signature) + if err != nil { + return nil, err + } + return bls.AggregateSignatures([]bls.Signature{aggSig, sig}).Marshal(), nil +} + +// Add adds a PayloadAttestationMessage to the internal cache of aggregated +// PayloadAttestations. +// If the index has already been seen for this attestation status the function does nothing. +// If the root is not the cached root, the function will clear the previous cache +// This function assumes that the message has already been validated. In +// particular that the signature is valid and that the block root corresponds to +// the given slot in the attestation data. +func (p *PayloadAttestationCache) Add(att *eth.PayloadAttestationMessage, idx uint64) error { + if att == nil || att.Data == nil || att.Data.BeaconBlockRoot == nil { + return errNilPayloadAttestationMessage + } + p.Lock() + defer p.Unlock() + root := [32]byte(att.Data.BeaconBlockRoot) + if p.root != root { + p.root = root + p.attestations = [primitives.PAYLOAD_INVALID_STATUS]*eth.PayloadAttestation{} + } + agg := p.attestations[att.Data.PayloadStatus] + if agg == nil { + p.attestations[att.Data.PayloadStatus] = messageToPayloadAttestation(att, idx) + return nil + } + if agg.AggregationBits.BitAt(idx) { + return nil + } + sig, err := aggregateSigFromMessage(agg, att) + if err != nil { + return err + } + agg.Signature = sig + agg.AggregationBits.SetBitAt(idx, true) + return nil +} + +// Clear clears the internal map +func (p *PayloadAttestationCache) Clear() { + p.Lock() + defer p.Unlock() + p.root = [32]byte{} + p.attestations = [primitives.PAYLOAD_INVALID_STATUS]*eth.PayloadAttestation{} +} diff --git a/beacon-chain/cache/payload_attestation_test.go b/beacon-chain/cache/payload_attestation_test.go new file mode 100644 index 000000000000..10c274222edf --- /dev/null +++ b/beacon-chain/cache/payload_attestation_test.go @@ -0,0 +1,95 @@ +package cache + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/crypto/bls" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestPayloadAttestationCache(t *testing.T) { + p := &PayloadAttestationCache{} + + //Test Has seen + root := [32]byte{'r'} + idx := uint64(5) + require.Equal(t, false, p.Seen(root, idx)) + + // Test Add + msg := ð.PayloadAttestationMessage{ + Signature: bls.NewAggregateSignature().Marshal(), + Data: ð.PayloadAttestationData{ + BeaconBlockRoot: root[:], + Slot: 1, + PayloadStatus: primitives.PAYLOAD_PRESENT, + }, + } + + // Add new root + require.NoError(t, p.Add(msg, idx)) + require.Equal(t, true, p.Seen(root, idx)) + require.Equal(t, root, p.root) + att := p.attestations[primitives.PAYLOAD_PRESENT] + indices := att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx)}, indices) + singleSig := bytesutil.SafeCopyBytes(msg.Signature) + require.DeepEqual(t, singleSig, att.Signature) + + // Test Seen + require.Equal(t, true, p.Seen(root, idx)) + require.Equal(t, false, p.Seen(root, idx+1)) + + // Add another attestation on the same data + msg2 := ð.PayloadAttestationMessage{ + Signature: bls.NewAggregateSignature().Marshal(), + Data: att.Data, + } + idx2 := uint64(7) + require.NoError(t, p.Add(msg2, idx2)) + att = p.attestations[primitives.PAYLOAD_PRESENT] + indices = att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx), int(idx2)}, indices) + require.DeepNotEqual(t, att.Signature, msg.Signature) + + // Try again the same index + require.NoError(t, p.Add(msg2, idx2)) + att2 := p.attestations[primitives.PAYLOAD_PRESENT] + indices = att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx), int(idx2)}, indices) + require.DeepEqual(t, att, att2) + + // Test Seen + require.Equal(t, true, p.Seen(root, idx2)) + require.Equal(t, false, p.Seen(root, idx2+1)) + + // Add another payload status for a different payload status + msg3 := ð.PayloadAttestationMessage{ + Signature: bls.NewAggregateSignature().Marshal(), + Data: ð.PayloadAttestationData{ + BeaconBlockRoot: root[:], + Slot: 1, + PayloadStatus: primitives.PAYLOAD_WITHHELD, + }, + } + idx3 := uint64(17) + + require.NoError(t, p.Add(msg3, idx3)) + att3 := p.attestations[primitives.PAYLOAD_WITHHELD] + indices3 := att3.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx3)}, indices3) + require.DeepEqual(t, singleSig, att3.Signature) + + // Add a different root + root2 := [32]byte{'s'} + msg.Data.BeaconBlockRoot = root2[:] + require.NoError(t, p.Add(msg, idx)) + require.Equal(t, root2, p.root) + require.Equal(t, true, p.Seen(root2, idx)) + require.Equal(t, false, p.Seen(root, idx)) + att = p.attestations[primitives.PAYLOAD_PRESENT] + indices = att.AggregationBits.BitIndices() + require.DeepEqual(t, []int{int(idx)}, indices) +} diff --git a/beacon-chain/core/helpers/payload_attestation.go b/beacon-chain/core/helpers/payload_attestation.go index 348c419f8f6e..5f626ff01dee 100644 --- a/beacon-chain/core/helpers/payload_attestation.go +++ b/beacon-chain/core/helpers/payload_attestation.go @@ -68,6 +68,21 @@ func ValidateNilPayloadAttestation(att *eth.PayloadAttestation) error { return ValidateNilPayloadAttestationData(att.Data) } +// InPayloadTimelinessCommittee returns whether the given index belongs to the +// PTC computed from the passed state. +func InPayloadTimelinessCommittee(ctx context.Context, state state.ReadOnlyBeaconState, slot primitives.Slot, idx primitives.ValidatorIndex) (bool, error) { + ptc, err := GetPayloadTimelinessCommittee(ctx, state, slot) + if err != nil { + return false, err + } + for _, i := range ptc { + if i == idx { + return true, nil + } + } + return false, nil +} + // GetPayloadTimelinessCommittee returns the PTC for the given slot, computed from the passed state as in the // spec function `get_ptc`. func GetPayloadTimelinessCommittee(ctx context.Context, state state.ReadOnlyBeaconState, slot primitives.Slot) (indices []primitives.ValidatorIndex, err error) { @@ -246,3 +261,36 @@ func IsValidIndexedPayloadAttestation(state state.ReadOnlyBeaconState, att *epbs return signature.FastAggregateVerify(publicKeys, signingRoot), nil } + +// ValidatePayloadAttestationMessageSignature verifies the signature of a +// payload attestation message. +func ValidatePayloadAttestationMessageSignature(ctx context.Context, st state.ReadOnlyBeaconState, msg *eth.PayloadAttestationMessage) error { + if err := ValidateNilPayloadAttestationMessage(msg); err != nil { + return err + } + val, err := st.ValidatorAtIndex(msg.ValidatorIndex) + if err != nil { + return err + } + pub, err := bls.PublicKeyFromBytes(val.PublicKey) + if err != nil { + return err + } + sig, err := bls.SignatureFromBytes(msg.Signature) + if err != nil { + return err + } + currentEpoch := slots.ToEpoch(st.Slot()) + domain, err := signing.Domain(st.Fork(), currentEpoch, params.BeaconConfig().DomainPTCAttester, st.GenesisValidatorsRoot()) + if err != nil { + return err + } + root, err := signing.ComputeSigningRoot(msg.Data, domain) + if err != nil { + return err + } + if !sig.Verify(pub, root[:]) { + return signing.ErrSigFailedToVerify + } + return nil +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index d3f3af2ecfbd..e77229a90ff7 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -102,6 +102,7 @@ type BeaconNode struct { blsToExecPool blstoexec.PoolManager depositCache cache.DepositCache trackedValidatorsCache *cache.TrackedValidatorsCache + payloadAttestationCache *cache.PayloadAttestationCache payloadIDCache *cache.PayloadIDCache stateFeed *event.Feed blockFeed *event.Feed @@ -152,6 +153,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco syncCommitteePool: synccommittee.NewPool(), blsToExecPool: blstoexec.NewPool(), trackedValidatorsCache: cache.NewTrackedValidatorsCache(), + payloadAttestationCache: &cache.PayloadAttestationCache{}, payloadIDCache: cache.NewPayloadIDCache(), slasherBlockHeadersFeed: new(event.Feed), slasherAttestationsFeed: new(event.Feed), @@ -775,6 +777,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithSyncComplete(syncComplete), blockchain.WithBlobStorage(b.BlobStorage), blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), + blockchain.WithPayloadAttestationCache(b.payloadAttestationCache), blockchain.WithPayloadIDCache(b.payloadIDCache), blockchain.WithSyncChecker(b.syncChecker), ) @@ -852,6 +855,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil regularsync.WithStateGen(b.stateGen), regularsync.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), regularsync.WithSlasherBlockHeadersFeed(b.slasherBlockHeadersFeed), + regularsync.WithPayloadAttestationCache(b.payloadAttestationCache), regularsync.WithPayloadReconstructor(web3Service), regularsync.WithClockWaiter(b.clockWaiter), regularsync.WithInitialSyncComplete(initialSyncComplete), diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index cba397bcf1bf..86dd4e15748c 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "log.go", "metrics.go", "options.go", + "payload_attestations.go", "pending_attestations_queue.go", "pending_blocks_queue.go", "rate_limiter.go", diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index 9b0281ea667f..30513eb662dd 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -2,6 +2,7 @@ package sync import ( "github.com/prysmaticlabs/prysm/v5/async/event" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" @@ -127,6 +128,13 @@ func WithSlasherBlockHeadersFeed(slasherBlockHeadersFeed *event.Feed) Option { } } +func WithPayloadAttestationCache(r *cache.PayloadAttestationCache) Option { + return func(s *Service) error { + s.payloadAttestationCache = r + return nil + } +} + func WithPayloadReconstructor(r execution.PayloadReconstructor) Option { return func(s *Service) error { s.cfg.executionPayloadReconstructor = r diff --git a/beacon-chain/sync/payload_attestations.go b/beacon-chain/sync/payload_attestations.go new file mode 100644 index 000000000000..485ca625c8d1 --- /dev/null +++ b/beacon-chain/sync/payload_attestations.go @@ -0,0 +1,111 @@ +package sync + +import ( + "context" + "slices" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "go.opencensus.io/trace" + "google.golang.org/protobuf/proto" +) + +var ( + errInvalidPayloadStatus = errors.New("invalid PayloadStatus") + errInvalidBeaconBlockRoot = errors.New("invalid BeaconBlockRoot") + errInvalidValidatorIndex = errors.New("invalid validator index") + errUnkownBeaconBlockRoot = errors.New("unkonwn beacon block") + errAlreadySeenPayloadAttestation = errors.New("payload attestation already seen for validator index") + errNotInPTC = errors.New("validator index not in Payload Timeliness Committee") +) + +func (s *Service) validatePayloadAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { + if pid == s.cfg.p2p.PeerID() { + return pubsub.ValidationAccept, nil + } + if s.cfg.initialSync.Syncing() { + return pubsub.ValidationIgnore, nil + } + ctx, span := trace.StartSpan(ctx, "sync.validatePayloadAttestation") + defer span.End() + if msg.Topic == nil { + return pubsub.ValidationReject, errInvalidTopic + } + m, err := s.decodePubsubMessage(msg) + if err != nil { + tracing.AnnotateError(span, err) + return pubsub.ValidationReject, err + } + att, ok := m.(*eth.PayloadAttestationMessage) + if !ok { + return pubsub.ValidationReject, errWrongMessage + } + if err := helpers.ValidateNilPayloadAttestationMessage(att); err != nil { + return pubsub.ValidationReject, err + } + if att.Data.Slot != s.cfg.clock.CurrentSlot() { + return pubsub.ValidationIgnore, nil + } + if att.Data.PayloadStatus >= primitives.PAYLOAD_INVALID_STATUS { + return pubsub.ValidationReject, errInvalidPayloadStatus + } + root := [32]byte(att.Data.BeaconBlockRoot) + if s.hasBadBlock(root) { + return pubsub.ValidationReject, errInvalidBeaconBlockRoot + } + if !s.cfg.chain.InForkchoice(root) { + return pubsub.ValidationIgnore, errUnkownBeaconBlockRoot + } + st, err := s.cfg.chain.HeadState(ctx) + if err != nil { + return pubsub.ValidationIgnore, err + } + if err := helpers.ValidatePayloadAttestationMessageSignature(ctx, st, att); err != nil { + return pubsub.ValidationReject, err + } + ptc, err := helpers.GetPayloadTimelinessCommittee(ctx, st, att.Data.Slot) + if err != nil { + return pubsub.ValidationIgnore, err + } + idx := slices.Index(ptc, att.ValidatorIndex) + if idx == -1 { + return pubsub.ValidationReject, errNotInPTC + } + if s.payloadAttestationCache.Seen(root, uint64(idx)) { + return pubsub.ValidationIgnore, errAlreadySeenPayloadAttestation + } + return pubsub.ValidationAccept, nil +} + +func (s *Service) payloadAttestationSubscriber(ctx context.Context, msg proto.Message) error { + a, ok := msg.(*eth.PayloadAttestationMessage) + if !ok { + return errWrongMessage + } + if err := helpers.ValidateNilPayloadAttestationMessage(a); err != nil { + return err + } + root := [32]byte(a.Data.BeaconBlockRoot) + st, err := s.cfg.chain.HeadState(ctx) + if err != nil { + return err + } + ptc, err := helpers.GetPayloadTimelinessCommittee(ctx, st, a.Data.Slot) + if err != nil { + return err + } + idx := slices.Index(ptc, a.ValidatorIndex) + if idx == -1 { + return errInvalidValidatorIndex + } + if s.payloadAttestationCache.Seen(root, uint64(idx)) { + return nil + } + + return s.payloadAttestationCache.Add(a, uint64(idx)) +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 15196bf6ca74..8d22ed86e2f3 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/async/abool" "github.com/prysmaticlabs/prysm/v5/async/event" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" @@ -124,6 +125,7 @@ type Service struct { seenPendingBlocks map[[32]byte]bool blkRootToPendingAtts map[[32]byte][]ethpb.SignedAggregateAttAndProof subHandler *subTopicHandler + payloadAttestationCache *cache.PayloadAttestationCache pendingAttsLock sync.RWMutex pendingQueueLock sync.RWMutex chainStarted *abool.AtomicBool diff --git a/consensus-types/primitives/BUILD.bazel b/consensus-types/primitives/BUILD.bazel index b3a73a307fe7..3659bc949cfa 100644 --- a/consensus-types/primitives/BUILD.bazel +++ b/consensus-types/primitives/BUILD.bazel @@ -9,6 +9,8 @@ go_library( "domain.go", "epoch.go", "execution_address.go", + "payload_attestations_mainnet.go", + "payload_attestations_minimal.go", #keep "payload_id.go", "ptc_status.go", "randao.go", diff --git a/consensus-types/primitives/payload_attestations_mainnet.go b/consensus-types/primitives/payload_attestations_mainnet.go new file mode 100644 index 000000000000..badba898e724 --- /dev/null +++ b/consensus-types/primitives/payload_attestations_mainnet.go @@ -0,0 +1,9 @@ +//go:build !minimal + +package primitives + +import bitfield "github.com/prysmaticlabs/go-bitfield" + +func NewPayloadAttestationAggregationBits() bitfield.Bitvector512 { + return bitfield.NewBitvector512() +} diff --git a/consensus-types/primitives/payload_attestations_minimal.go b/consensus-types/primitives/payload_attestations_minimal.go new file mode 100644 index 000000000000..3174669b087e --- /dev/null +++ b/consensus-types/primitives/payload_attestations_minimal.go @@ -0,0 +1,9 @@ +//go:build minimal + +package primitives + +import bitfield "github.com/prysmaticlabs/go-bitfield" + +func NewPayloadAttestationAggregationBits() bitfield.Bitvector32 { + return bitfield.NewBitvector32() +}