From b6a4e1213c825a27dfe2742ea3eb6b5b4e93fd6e Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Mon, 22 Apr 2024 15:26:27 +0800 Subject: [PATCH] Add Data Column Gossip Handlers (#13894) * Add Data Column Subscriber * Add Data Column Vaidator * Wire all Handlers In * Fix Build * Fix Test * Fix IP in Test * Fix IP in Test --- beacon-chain/blockchain/BUILD.bazel | 1 + beacon-chain/blockchain/receive_block.go | 6 + beacon-chain/blockchain/receive_sidecar.go | 12 ++ beacon-chain/blockchain/testing/mock.go | 5 + beacon-chain/core/blocks/signature.go | 18 +++ beacon-chain/core/feed/operation/events.go | 7 + beacon-chain/p2p/gossip_topic_mappings.go | 1 + beacon-chain/p2p/pubsub_filter_test.go | 2 +- beacon-chain/p2p/topics.go | 5 + beacon-chain/sync/BUILD.bazel | 2 + beacon-chain/sync/decode_pubsub.go | 2 + beacon-chain/sync/service.go | 1 + beacon-chain/sync/subscriber.go | 125 ++++++++++++++-- .../sync/subscriber_data_column_sidecar.go | 34 +++++ .../sync/validate_data_column_sidecar.go | 138 ++++++++++++++++++ 15 files changed, 348 insertions(+), 11 deletions(-) create mode 100644 beacon-chain/blockchain/receive_sidecar.go create mode 100644 beacon-chain/sync/subscriber_data_column_sidecar.go create mode 100644 beacon-chain/sync/validate_data_column_sidecar.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index b60a763d0fde..48b32951de39 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "receive_attestation.go", "receive_blob.go", "receive_block.go", + "receive_sidecar.go", "service.go", "tracked_proposer.go", "weak_subjectivity_checks.go", diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 42f9a9542f55..3030a6a74618 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -51,6 +51,12 @@ type BlobReceiver interface { ReceiveBlob(context.Context, blocks.VerifiedROBlob) error } +// DataColumnReceiver interface defines the methods of chain service for receiving new +// data columns +type DataColumnReceiver interface { + ReceiveDataColumn(context.Context, *ethpb.DataColumnSidecar) error +} + // SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire. type SlashingReceiver interface { ReceiveAttesterSlashing(ctx context.Context, slashing ethpb.AttSlashing) diff --git a/beacon-chain/blockchain/receive_sidecar.go b/beacon-chain/blockchain/receive_sidecar.go new file mode 100644 index 000000000000..7ad74311c50e --- /dev/null +++ b/beacon-chain/blockchain/receive_sidecar.go @@ -0,0 +1,12 @@ +package blockchain + +import ( + "context" + + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" +) + +func (s *Service) ReceiveDataColumn(ctx context.Context, ds *ethpb.DataColumnSidecar) error { + // TODO + return nil +} diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index d0da4f0cd07b..b444219a81e9 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -628,6 +628,11 @@ func (c *ChainService) ReceiveBlob(_ context.Context, b blocks.VerifiedROBlob) e return nil } +// ReceiveDataColumn implements the same method in chain service +func (c *ChainService) ReceiveDataColumn(_ context.Context, _ *ethpb.DataColumnSidecar) error { + return nil +} + // TargetRootForEpoch mocks the same method in the chain service func (c *ChainService) TargetRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) { return c.TargetRoot, nil diff --git a/beacon-chain/core/blocks/signature.go b/beacon-chain/core/blocks/signature.go index dedd5856ec68..a2b1ba2e1b59 100644 --- a/beacon-chain/core/blocks/signature.go +++ b/beacon-chain/core/blocks/signature.go @@ -96,6 +96,24 @@ func VerifyBlockHeaderSignature(beaconState state.BeaconState, header *ethpb.Sig return signing.VerifyBlockHeaderSigningRoot(header.Header, proposerPubKey, header.Signature, domain) } +func VerifyBlockHeaderSignatureUsingCurrentFork(beaconState state.BeaconState, header *ethpb.SignedBeaconBlockHeader) error { + currentEpoch := slots.ToEpoch(header.Header.Slot) + fork, err := forks.Fork(currentEpoch) + if err != nil { + return err + } + domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot()) + if err != nil { + return err + } + proposer, err := beaconState.ValidatorAtIndex(header.Header.ProposerIndex) + if err != nil { + return err + } + proposerPubKey := proposer.PublicKey + return signing.VerifyBlockHeaderSigningRoot(header.Header, proposerPubKey, header.Signature, domain) +} + // VerifyBlockSignatureUsingCurrentFork verifies the proposer signature of a beacon block. This differs // from the above method by not using fork data from the state and instead retrieving it // via the respective epoch. diff --git a/beacon-chain/core/feed/operation/events.go b/beacon-chain/core/feed/operation/events.go index 86287da922e5..a433bf759440 100644 --- a/beacon-chain/core/feed/operation/events.go +++ b/beacon-chain/core/feed/operation/events.go @@ -32,6 +32,9 @@ const ( // AttesterSlashingReceived is sent after an attester slashing is received from gossip or rpc AttesterSlashingReceived = 8 + + // DataColumnSidecarReceived is sent after a data column sidecar is received from gossip or rpc. + DataColumnSidecarReceived = 9 ) // UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events. @@ -77,3 +80,7 @@ type ProposerSlashingReceivedData struct { type AttesterSlashingReceivedData struct { AttesterSlashing ethpb.AttSlashing } + +type DataColumnSidecarReceivedData struct { + DataColumn *ethpb.DataColumnSidecar +} diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index d88a4499ce2b..12b23ae58823 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -22,6 +22,7 @@ var gossipTopicMappings = map[string]func() proto.Message{ SyncCommitteeSubnetTopicFormat: func() proto.Message { return ðpb.SyncCommitteeMessage{} }, BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return ðpb.SignedBLSToExecutionChange{} }, BlobSubnetTopicFormat: func() proto.Message { return ðpb.BlobSidecar{} }, + DataColumnSubnetTopicFormat: func() proto.Message { return ðpb.DataColumnSidecar{} }, } // GossipTopicMappings is a function to return the assigned data type diff --git a/beacon-chain/p2p/pubsub_filter_test.go b/beacon-chain/p2p/pubsub_filter_test.go index 236f650a219c..247558191065 100644 --- a/beacon-chain/p2p/pubsub_filter_test.go +++ b/beacon-chain/p2p/pubsub_filter_test.go @@ -90,7 +90,7 @@ func TestService_CanSubscribe(t *testing.T) { formatting := []interface{}{digest} // Special case for attestation subnets which have a second formatting placeholder. - if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat { + if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat || topic == DataColumnSubnetTopicFormat { formatting = append(formatting, 0 /* some subnet ID */) } diff --git a/beacon-chain/p2p/topics.go b/beacon-chain/p2p/topics.go index 3187e36a5cfe..8987b1f6334b 100644 --- a/beacon-chain/p2p/topics.go +++ b/beacon-chain/p2p/topics.go @@ -30,6 +30,9 @@ const ( GossipBlsToExecutionChangeMessage = "bls_to_execution_change" // GossipBlobSidecarMessage is the name for the blob sidecar message type. GossipBlobSidecarMessage = "blob_sidecar" + // GossipDataColumnSidecarMessage is the name for the data column sidecar message type. + GossipDataColumnSidecarMessage = "data_column_sidecar" + // Topic Formats // // AttestationSubnetTopicFormat is the topic format for the attestation subnet. @@ -52,4 +55,6 @@ const ( BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage // BlobSubnetTopicFormat is the topic format for the blob subnet. BlobSubnetTopicFormat = GossipProtocolAndDigest + GossipBlobSidecarMessage + "_%d" + // DataColumnSubnetTopicFormat is the topic format for the data column subnet. + DataColumnSubnetTopicFormat = GossipProtocolAndDigest + GossipDataColumnSidecarMessage + "_%d" ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index cba397bcf1bf..ca8d049c2e48 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "subscriber_beacon_blocks.go", "subscriber_blob_sidecar.go", "subscriber_bls_to_execution_change.go", + "subscriber_data_column_sidecar.go", "subscriber_handlers.go", "subscriber_sync_committee_message.go", "subscriber_sync_contribution_proof.go", @@ -48,6 +49,7 @@ go_library( "validate_beacon_blocks.go", "validate_blob.go", "validate_bls_to_execution_change.go", + "validate_data_column_sidecar.go", "validate_proposer_slashing.go", "validate_sync_committee_message.go", "validate_sync_contribution_proof.go", diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 1ec9d079448a..529aa8338d98 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -45,6 +45,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] case strings.Contains(topic, p2p.GossipBlobSidecarMessage): topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.BlobSidecar{})] + case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): + topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.DataColumnSidecar{})] } base := p2p.GossipTopicMappings(topic, 0) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 15196bf6ca74..22e5d9d6b62b 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -102,6 +102,7 @@ type config struct { type blockchainService interface { blockchain.BlockReceiver blockchain.BlobReceiver + blockchain.DataColumnReceiver blockchain.HeadFetcher blockchain.FinalizationFetcher blockchain.ForkFetcher diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 9c9dc827d129..0bfa4d185f0b 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -137,16 +137,32 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { // New Gossip Topic in Deneb if epoch >= params.BeaconConfig().DenebForkEpoch { - s.subscribeStaticWithSubnets( - p2p.BlobSubnetTopicFormat, - s.validateBlob, /* validator */ - s.blobSubscriber, /* message handler */ - digest, - params.BeaconConfig().BlobsidecarSubnetCount, - ) - } - if features.Get().EnablePeerDAS { - // TODO: Subscribe to persistent column subnets here + if features.Get().EnablePeerDAS { + if flags.Get().SubscribeToAllSubnets { + s.subscribeStaticWithSubnets( + p2p.DataColumnSubnetTopicFormat, + s.validateDataColumn, /* validator */ + s.dataColumnSubscriber, /* message handler */ + digest, + params.BeaconConfig().DataColumnSidecarSubnetCount, + ) + } else { + s.subscribeDynamicWithColumnSubnets( + p2p.DataColumnSubnetTopicFormat, + s.validateDataColumn, /* validator */ + s.dataColumnSubscriber, /* message handler */ + digest, + ) + } + } else { + s.subscribeStaticWithSubnets( + p2p.BlobSubnetTopicFormat, + s.validateBlob, /* validator */ + s.blobSubscriber, /* message handler */ + digest, + params.BeaconConfig().BlobsidecarSubnetCount, + ) + } } } @@ -649,6 +665,87 @@ func (s *Service) subscribeDynamicWithSyncSubnets( }() } +// subscribe missing subnets for our persistent columns. +func (s *Service) subscribeColumnSubnet( + subscriptions map[uint64]*pubsub.Subscription, + idx uint64, + digest [4]byte, + validate wrappedVal, + handle subHandler, +) { + // do not subscribe if we have no peers in the same + // subnet + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.DataColumnSidecar{})] + subnetTopic := fmt.Sprintf(topic, digest, idx) + // check if subscription exists and if not subscribe the relevant subnet. + if _, exists := subscriptions[idx]; !exists { + subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) + } + if !s.validPeersExist(subnetTopic) { + log.Debugf("No peers found subscribed to column gossip subnet with "+ + "column index %d. Searching network for peers subscribed to the subnet.", idx) + _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) + if err != nil { + log.WithError(err).Debug("Could not search for peers") + } + } +} + +func (s *Service) subscribeDynamicWithColumnSubnets( + topicFormat string, + validate wrappedVal, + handle subHandler, + digest [4]byte, +) { + genRoot := s.cfg.clock.GenesisValidatorsRoot() + _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) + if err != nil { + panic(err) + } + base := p2p.GossipTopicMappings(topicFormat, e) + if base == nil { + panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) + } + subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().DataColumnSidecarSubnetCount) + genesis := s.cfg.clock.GenesisTime() + ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + go func() { + for { + select { + case <-s.ctx.Done(): + ticker.Done() + return + case <-ticker.C(): + if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { + continue + } + valid, err := isDigestValid(digest, genesis, genRoot) + if err != nil { + log.Error(err) + continue + } + if !valid { + log.Warnf("Column subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) + // Unsubscribes from all our current subnets. + s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + ticker.Done() + return + } + + wantedSubs := s.retrieveActiveColumnSubnets() + // Resize as appropriate. + s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) + + // subscribe desired column subnets. + for _, idx := range wantedSubs { + s.subscribeColumnSubnet(subscriptions, idx, digest, validate, handle) + } + } + } + }() +} + // lookup peers for attester specific subnets. func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] @@ -700,6 +797,14 @@ func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 { return slice.SetUint64(subs) } +func (*Service) retrieveActiveColumnSubnets() []uint64 { + subs, ok, _ := cache.ColumnSubnetIDs.GetColumnSubnets() + if !ok { + return nil + } + return subs +} + // filters out required peers for the node to function, not // pruning peers who are in our attestation subnets. func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID { diff --git a/beacon-chain/sync/subscriber_data_column_sidecar.go b/beacon-chain/sync/subscriber_data_column_sidecar.go new file mode 100644 index 000000000000..98c66c082a9e --- /dev/null +++ b/beacon-chain/sync/subscriber_data_column_sidecar.go @@ -0,0 +1,34 @@ +package sync + +import ( + "context" + "fmt" + + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" + opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "google.golang.org/protobuf/proto" +) + +func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) error { + b, ok := msg.(*ethpb.DataColumnSidecar) + if !ok { + return fmt.Errorf("message was not type DataColumnSidecar, type=%T", msg) + } + + // TODO:Change to new one for data columns + s.setSeenBlobIndex(b.SignedBlockHeader.Header.Slot, b.SignedBlockHeader.Header.ProposerIndex, b.ColumnIndex) + + if err := s.cfg.chain.ReceiveDataColumn(ctx, b); err != nil { + return err + } + + s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ + Type: opfeed.DataColumnSidecarReceived, + Data: &opfeed.DataColumnSidecarReceivedData{ + DataColumn: b, + }, + }) + + return nil +} diff --git a/beacon-chain/sync/validate_data_column_sidecar.go b/beacon-chain/sync/validate_data_column_sidecar.go new file mode 100644 index 000000000000..4406051802de --- /dev/null +++ b/beacon-chain/sync/validate_data_column_sidecar.go @@ -0,0 +1,138 @@ +package sync + +import ( + "context" + "fmt" + "strings" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v5/config/params" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + prysmTime "github.com/prysmaticlabs/prysm/v5/time" + "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" +) + +func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { + receivedTime := prysmTime.Now() + + if pid == s.cfg.p2p.PeerID() { + return pubsub.ValidationAccept, nil + } + if s.cfg.initialSync.Syncing() { + return pubsub.ValidationIgnore, nil + } + if msg.Topic == nil { + return pubsub.ValidationReject, errInvalidTopic + } + m, err := s.decodePubsubMessage(msg) + if err != nil { + log.WithError(err).Error("Failed to decode message") + return pubsub.ValidationReject, err + } + + ds, ok := m.(*eth.DataColumnSidecar) + if !ok { + log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar") + return pubsub.ValidationReject, errWrongMessage + } + if ds.ColumnIndex >= params.BeaconConfig().NumberOfColumns { + return pubsub.ValidationReject, errors.Errorf("invalid column index provided, got %d", ds.ColumnIndex) + } + want := fmt.Sprintf("data_column_sidecar_%d", computeSubnetForColumnSidecar(ds.ColumnIndex)) + if !strings.Contains(*msg.Topic, want) { + log.Debug("Column Sidecar index does not match topic") + return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic) + } + if err := slots.VerifyTime(uint64(s.cfg.clock.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot, params.BeaconConfig().MaximumGossipClockDisparityDuration()); err != nil { + log.WithError(err).Debug("Ignored sidecar: could not verify slot time") + return pubsub.ValidationIgnore, nil + } + cp := s.cfg.chain.FinalizedCheckpt() + startSlot, err := slots.EpochStart(cp.Epoch) + if err != nil { + log.WithError(err).Debug("Ignored column sidecar: could not calculate epoch start slot") + return pubsub.ValidationIgnore, nil + } + if startSlot >= ds.SignedBlockHeader.Header.Slot { + err := fmt.Errorf("finalized slot %d greater or equal to block slot %d", startSlot, ds.SignedBlockHeader.Header.Slot) + log.Debug(err) + return pubsub.ValidationIgnore, err + } + // Handle sidecar when the parent is unknown. + if !s.cfg.chain.HasBlock(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) { + err := errors.Errorf("unknown parent for data column sidecar with slot %d and parent root %#x", ds.SignedBlockHeader.Header.Slot, ds.SignedBlockHeader.Header.ParentRoot) + log.WithError(err).Debug("Could not identify parent for data column sidecar") + return pubsub.ValidationIgnore, err + } + if s.hasBadBlock([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) { + bRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot() + if err != nil { + return pubsub.ValidationIgnore, err + } + s.setBadBlock(ctx, bRoot) + return pubsub.ValidationReject, errors.Errorf("column sidecar with bad parent provided") + } + parentSlot, err := s.cfg.chain.RecentBlockSlot([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) + if err != nil { + return pubsub.ValidationIgnore, err + } + if ds.SignedBlockHeader.Header.Slot <= parentSlot { + return pubsub.ValidationReject, errors.Errorf("invalid column sidecar slot: %d", ds.SignedBlockHeader.Header.Slot) + } + if !s.cfg.chain.InForkchoice([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) { + return pubsub.ValidationReject, blockchain.ErrNotDescendantOfFinalized + } + // TODO Verify KZG inclusion proof of data column sidecar + + // TODO Verify KZG proofs of column sidecar + + parentState, err := s.cfg.stateGen.StateByRoot(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) + if err != nil { + return pubsub.ValidationIgnore, err + } + + if err := blocks.VerifyBlockHeaderSignatureUsingCurrentFork(parentState, ds.SignedBlockHeader); err != nil { + return pubsub.ValidationReject, err + } + // In the event the block is more than an epoch ahead from its + // parent state, we have to advance the state forward. + parentRoot := ds.SignedBlockHeader.Header.ParentRoot + parentState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot, ds.SignedBlockHeader.Header.Slot) + if err != nil { + return pubsub.ValidationIgnore, err + } + idx, err := helpers.BeaconProposerIndex(ctx, parentState) + if err != nil { + return pubsub.ValidationIgnore, err + } + if ds.SignedBlockHeader.Header.ProposerIndex != idx { + return pubsub.ValidationReject, errors.New("incorrect proposer index") + } + + startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot) + if err != nil { + return pubsub.ValidationIgnore, err + } + + sinceSlotStartTime := receivedTime.Sub(startTime) + validationTime := s.cfg.clock.Now().Sub(receivedTime) + + log.WithFields(logrus.Fields{ + "sinceSlotStartTime": sinceSlotStartTime, + "validationTime": validationTime, + }).Debug("Received data column sidecar") + + msg.ValidatorData = ds + return pubsub.ValidationAccept, nil +} + +func computeSubnetForColumnSidecar(colIdx uint64) uint64 { + return colIdx % params.BeaconConfig().DataColumnSidecarSubnetCount +}