From 92e95b37690c70c451d6a5caed3984892844a822 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 19 Sep 2024 11:12:56 +0200 Subject: [PATCH] Peerdas: Several updates (#14459) * `validateDataColumn`: Refactor logging. * `dataColumnSidecarByRootRPCHandler`: Improve logging. * `isDataAvailable`: Improve logging. * Add hidden debug flag: `--data-columns-reject-slot-multiple`. * Add more logs about peer disconnection. * `validPeersExist` --> `enoughPeersAreConnected` * `beaconBlocksByRangeRPCHandler`: Add remote Peer ID in logs. * Stop calling twice `writeErrorResponseToStream` in case of rate limit. --- beacon-chain/blockchain/process_block.go | 15 +- beacon-chain/p2p/connection_gater.go | 2 +- beacon-chain/p2p/discovery.go | 2 +- beacon-chain/p2p/handshake.go | 141 ++++++++++++------ beacon-chain/p2p/peers/scorers/BUILD.bazel | 1 + .../p2p/peers/scorers/bad_responses.go | 19 ++- .../p2p/peers/scorers/bad_responses_test.go | 26 ++-- .../p2p/peers/scorers/block_providers.go | 4 +- .../p2p/peers/scorers/block_providers_test.go | 4 +- .../p2p/peers/scorers/gossip_scorer.go | 16 +- .../p2p/peers/scorers/gossip_scorer_test.go | 4 +- beacon-chain/p2p/peers/scorers/peer_status.go | 18 ++- .../p2p/peers/scorers/peer_status_test.go | 32 ++-- beacon-chain/p2p/peers/scorers/service.go | 26 ++-- .../p2p/peers/scorers/service_test.go | 22 +-- beacon-chain/p2p/peers/status.go | 37 +++-- beacon-chain/p2p/peers/status_test.go | 14 +- beacon-chain/p2p/service.go | 4 +- beacon-chain/sync/pending_blocks_queue.go | 2 +- beacon-chain/sync/rate_limiter_test.go | 2 +- beacon-chain/sync/rpc.go | 10 +- .../sync/rpc_beacon_blocks_by_range.go | 30 +++- .../sync/rpc_blob_sidecars_by_range.go | 8 +- .../sync/rpc_data_column_sidecars_by_range.go | 7 +- .../sync/rpc_data_column_sidecars_by_root.go | 52 +++++-- beacon-chain/sync/rpc_goodbye.go | 7 +- beacon-chain/sync/rpc_status.go | 19 ++- beacon-chain/sync/rpc_status_test.go | 4 +- beacon-chain/sync/subscriber.go | 20 +-- beacon-chain/sync/validate_data_column.go | 30 +++- config/features/config.go | 10 +- config/features/flags.go | 8 + runtime/logging/data_column.go | 17 +-- validator/client/propose.go | 2 +- 34 files changed, 400 insertions(+), 215 deletions(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 8a2a4c48b2e9..1d942d0e5682 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -746,7 +746,7 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si "root": fmt.Sprintf("%#x", root), "columnsExpected": expected, "columnsWaiting": missing, - }).Error("Some data columns are still unavailable at slot end.") + }).Error("Some data columns are still unavailable at slot end") }) defer nst.Stop() @@ -779,12 +779,15 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si return nil } case <-ctx.Done(): - missingIndexes := make([]uint64, 0, len(missingMap)) - for val := range missingMap { - copiedVal := val - missingIndexes = append(missingIndexes, copiedVal) + var missingIndices interface{} = "all" + numberOfColumns := params.BeaconConfig().NumberOfColumns + missingIndicesCount := uint64(len(missingMap)) + + if missingIndicesCount < numberOfColumns { + missingIndices = uint64MapToSortedSlice(missingMap) } - return errors.Wrapf(ctx.Err(), "context deadline waiting for data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndexes) + + return errors.Wrapf(ctx.Err(), "context deadline waiting for data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndices) } } } diff --git a/beacon-chain/p2p/connection_gater.go b/beacon-chain/p2p/connection_gater.go index a573bea81cc6..8147d07b6c4c 100644 --- a/beacon-chain/p2p/connection_gater.go +++ b/beacon-chain/p2p/connection_gater.go @@ -33,7 +33,7 @@ func (*Service) InterceptPeerDial(_ peer.ID) (allow bool) { // multiaddr for the given peer. func (s *Service) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) { // Disallow bad peers from dialing in. - if s.peers.IsBad(pid) { + if s.peers.IsBad(pid) != nil { return false } return filterConnections(s.addrFilter, m) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 8e26489c5d0f..6dd97560fce5 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -587,7 +587,7 @@ func (s *Service) filterPeer(node *enode.Node) bool { } // Ignore bad nodes. - if s.peers.IsBad(peerData.ID) { + if s.peers.IsBad(peerData.ID) != nil { return false } diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 3bf9bd7e7ddb..623d32be2b6e 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -2,7 +2,6 @@ package p2p import ( "context" - "errors" "fmt" "io" "sync" @@ -10,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" prysmTime "github.com/prysmaticlabs/prysm/v5/time" @@ -22,7 +22,57 @@ const ( ) func peerMultiaddrString(conn network.Conn) string { - return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String()) + remoteMultiaddr := conn.RemoteMultiaddr().String() + remotePeerID := conn.RemotePeer().String() + return fmt.Sprintf("%s/p2p/%s", remoteMultiaddr, remotePeerID) +} + +func (s *Service) connectToPeer(conn network.Conn) { + s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) + // Go through the handshake process. + log.WithFields(logrus.Fields{ + "direction": conn.Stat().Direction.String(), + "multiAddr": peerMultiaddrString(conn), + "activePeers": len(s.peers.Active()), + }).Debug("Initiate peer connection") +} + +func (s *Service) disconnectFromPeer( + conn network.Conn, + goodByeFunc func(ctx context.Context, id peer.ID) error, + badPeerErr error, +) { + // Get the remote peer ID. + remotePeerID := conn.RemotePeer() + + // Get the direction of the connection. + direction := conn.Stat().Direction.String() + + // Get the remote peer multiaddr. + remotePeerMultiAddr := peerMultiaddrString(conn) + + // Set the peer to disconnecting state. + s.peers.SetConnectionState(remotePeerID, peers.PeerDisconnecting) + + // Only attempt a goodbye if we are still connected to the peer. + if s.host.Network().Connectedness(remotePeerID) == network.Connected { + if err := goodByeFunc(context.TODO(), remotePeerID); err != nil { + log.WithError(err).Error("Unable to disconnect from peer") + } + } + + // Get the remaining active peers. + activePeerCount := len(s.peers.Active()) + log. + WithError(badPeerErr). + WithFields(logrus.Fields{ + "multiaddr": remotePeerMultiAddr, + "direction": direction, + "remainingActivePeers": activePeerCount, + }). + Debug("Initiate peer disconnection") + + s.peers.SetConnectionState(remotePeerID, peers.PeerDisconnected) } // AddConnectionHandler adds a callback function which handles the connection with a @@ -59,16 +109,7 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con s.host.Network().Notify(&network.NotifyBundle{ ConnectedF: func(net network.Network, conn network.Conn) { remotePeer := conn.RemotePeer() - disconnectFromPeer := func() { - s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting) - // Only attempt a goodbye if we are still connected to the peer. - if s.host.Network().Connectedness(remotePeer) == network.Connected { - if err := goodByeFunc(context.TODO(), remotePeer); err != nil { - log.WithError(err).Error("Unable to disconnect from peer") - } - } - s.peers.SetConnectionState(remotePeer, peers.PeerDisconnected) - } + // Connection handler must be non-blocking as part of libp2p design. go func() { if peerHandshaking(remotePeer) { @@ -77,30 +118,22 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con return } defer peerFinished(remotePeer) + // Handle the various pre-existing conditions that will result in us not handshaking. peerConnectionState, err := s.peers.ConnectionState(remotePeer) if err == nil && (peerConnectionState == peers.PeerConnected || peerConnectionState == peers.PeerConnecting) { log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request") return } + s.peers.Add(nil /* ENR */, remotePeer, conn.RemoteMultiaddr(), conn.Stat().Direction) + // Defensive check in the event we still get a bad peer. - if s.peers.IsBad(remotePeer) { - log.WithField("reason", "bad peer").Trace("Ignoring connection request") - disconnectFromPeer() + if err := s.peers.IsBad(remotePeer); err != nil { + s.disconnectFromPeer(conn, goodByeFunc, err) return } - validPeerConnection := func() { - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) - // Go through the handshake process. - log.WithFields(logrus.Fields{ - "direction": conn.Stat().Direction.String(), - "multiAddr": peerMultiaddrString(conn), - "activePeers": len(s.peers.Active()), - }).Debug("New peer connection") - } - // Do not perform handshake on inbound dials. if conn.Stat().Direction == network.DirInbound { _, err := s.peers.ChainState(remotePeer) @@ -118,67 +151,83 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con // If peer hasn't sent a status request, we disconnect with them if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) { statusMessageMissing.Inc() - disconnectFromPeer() + s.disconnectFromPeer(conn, goodByeFunc, errors.Wrap(err, "chain state")) return } + if peerExists { updated, err := s.peers.ChainStateLastUpdated(remotePeer) if err != nil { - disconnectFromPeer() + s.disconnectFromPeer(conn, goodByeFunc, errors.Wrap(err, "chain state last updated")) return } - // exit if we don't receive any current status messages from - // peer. - if updated.IsZero() || !updated.After(currentTime) { - disconnectFromPeer() + + // Exit if we don't receive any current status messages from peer. + if updated.IsZero() { + s.disconnectFromPeer(conn, goodByeFunc, errors.New("is zero")) + return + } + + if !updated.After(currentTime) { + s.disconnectFromPeer(conn, goodByeFunc, errors.New("did not update")) return } } - validPeerConnection() + + s.connectToPeer(conn) return } s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting) if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) { - log.WithError(err).Trace("Handshake failed") - disconnectFromPeer() + s.disconnectFromPeer(conn, goodByeFunc, err) return } - validPeerConnection() + + s.connectToPeer(conn) }() }, }) } -// AddDisconnectionHandler disconnects from peers. It handles updating the peer status. +// AddDisconnectionHandler disconnects from peers. It handles updating the peer status. // This also calls the handler responsible for maintaining other parts of the sync or p2p system. func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) { s.host.Network().Notify(&network.NotifyBundle{ DisconnectedF: func(net network.Network, conn network.Conn) { - log := log.WithField("multiAddr", peerMultiaddrString(conn)) + remotePeerMultiAddr := peerMultiaddrString(conn) + peerID := conn.RemotePeer() + direction := conn.Stat().Direction.String() + + log := log.WithFields(logrus.Fields{ + "multiAddr": remotePeerMultiAddr, + "direction": direction, + }) + // Must be handled in a goroutine as this callback cannot be blocking. go func() { // Exit early if we are still connected to the peer. - if net.Connectedness(conn.RemotePeer()) == network.Connected { + if net.Connectedness(peerID) == network.Connected { return } - priorState, err := s.peers.ConnectionState(conn.RemotePeer()) + + priorState, err := s.peers.ConnectionState(peerID) if err != nil { // Can happen if the peer has already disconnected, so... priorState = peers.PeerDisconnected } - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) + + s.peers.SetConnectionState(peerID, peers.PeerDisconnecting) if err := handler(context.TODO(), conn.RemotePeer()); err != nil { log.WithError(err).Error("Disconnect handler failed") } - s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) + + s.peers.SetConnectionState(peerID, peers.PeerDisconnected) + // Only log disconnections if we were fully connected. if priorState == peers.PeerConnected { - log.WithFields(logrus.Fields{ - "direction": conn.Stat().Direction.String(), - "multiAddr": peerMultiaddrString(conn), - "activePeers": len(s.peers.Active()), - }).Debug("Peer disconnected") + activePeersCount := len(s.peers.Active()) + log.WithField("remainingActivePeers", activePeersCount).Debug("Peer disconnected") } }() }, diff --git a/beacon-chain/p2p/peers/scorers/BUILD.bazel b/beacon-chain/p2p/peers/scorers/BUILD.bazel index e6eb6a277c8e..463ade4fa264 100644 --- a/beacon-chain/p2p/peers/scorers/BUILD.bazel +++ b/beacon-chain/p2p/peers/scorers/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//crypto/rand:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", + "@com_github_pkg_errors//:go_default_library", ], ) diff --git a/beacon-chain/p2p/peers/scorers/bad_responses.go b/beacon-chain/p2p/peers/scorers/bad_responses.go index 73d74ecfc0a6..9e834e25780f 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses.go @@ -4,6 +4,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" ) @@ -61,7 +62,7 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 { // scoreNoLock is a lock-free version of Score. func (s *BadResponsesScorer) scoreNoLock(pid peer.ID) float64 { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { return BadPeerScore } score := float64(0) @@ -116,18 +117,24 @@ func (s *BadResponsesScorer) Increment(pid peer.ID) { // IsBadPeer states if the peer is to be considered bad. // If the peer is unknown this will return `false`, which makes using this function easier than returning an error. -func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) bool { +func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() + return s.isBadPeerNoLock(pid) } // isBadPeerNoLock is lock-free version of IsBadPeer. -func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) bool { +func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) error { if peerData, ok := s.store.PeerData(pid); ok { - return peerData.BadResponses >= s.config.Threshold + if peerData.BadResponses >= s.config.Threshold { + return errors.Errorf("peer exceeded bad responses threshold: got %d, threshold %d", peerData.BadResponses, s.config.Threshold) + } + + return nil } - return false + + return nil } // BadPeers returns the peers that are considered bad. @@ -137,7 +144,7 @@ func (s *BadResponsesScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/bad_responses_test.go b/beacon-chain/p2p/peers/scorers/bad_responses_test.go index 186a50f55d98..094be28d5f5a 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses_test.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses_test.go @@ -33,19 +33,19 @@ func TestScorers_BadResponses_Score(t *testing.T) { assert.Equal(t, 0., scorer.Score(pid), "Unexpected score for unregistered peer") scorer.Increment(pid) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) assert.Equal(t, -2.5, scorer.Score(pid)) scorer.Increment(pid) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) assert.Equal(t, float64(-5), scorer.Score(pid)) scorer.Increment(pid) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) assert.Equal(t, float64(-7.5), scorer.Score(pid)) scorer.Increment(pid) - assert.Equal(t, true, scorer.IsBadPeer(pid)) + assert.NotNil(t, scorer.IsBadPeer(pid)) assert.Equal(t, -100.0, scorer.Score(pid)) } @@ -152,17 +152,17 @@ func TestScorers_BadResponses_IsBadPeer(t *testing.T) { }) scorer := peerStatuses.Scorers().BadResponsesScorer() pid := peer.ID("peer1") - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) peerStatuses.Add(nil, pid, nil, network.DirUnknown) - assert.Equal(t, false, scorer.IsBadPeer(pid)) + assert.NoError(t, scorer.IsBadPeer(pid)) for i := 0; i < scorers.DefaultBadResponsesThreshold; i++ { scorer.Increment(pid) if i == scorers.DefaultBadResponsesThreshold-1 { - assert.Equal(t, true, scorer.IsBadPeer(pid), "Unexpected peer status") + assert.NotNil(t, scorer.IsBadPeer(pid), "Unexpected peer status") } else { - assert.Equal(t, false, scorer.IsBadPeer(pid), "Unexpected peer status") + assert.NoError(t, scorer.IsBadPeer(pid), "Unexpected peer status") } } } @@ -185,11 +185,11 @@ func TestScorers_BadResponses_BadPeers(t *testing.T) { scorer.Increment(pids[2]) scorer.Increment(pids[4]) } - assert.Equal(t, false, scorer.IsBadPeer(pids[0]), "Invalid peer status") - assert.Equal(t, true, scorer.IsBadPeer(pids[1]), "Invalid peer status") - assert.Equal(t, true, scorer.IsBadPeer(pids[2]), "Invalid peer status") - assert.Equal(t, false, scorer.IsBadPeer(pids[3]), "Invalid peer status") - assert.Equal(t, true, scorer.IsBadPeer(pids[4]), "Invalid peer status") + assert.NoError(t, scorer.IsBadPeer(pids[0]), "Invalid peer status") + assert.NotNil(t, scorer.IsBadPeer(pids[1]), "Invalid peer status") + assert.NotNil(t, scorer.IsBadPeer(pids[2]), "Invalid peer status") + assert.NoError(t, scorer.IsBadPeer(pids[3]), "Invalid peer status") + assert.NotNil(t, scorer.IsBadPeer(pids[4]), "Invalid peer status") want := []peer.ID{pids[1], pids[2], pids[4]} badPeers := scorer.BadPeers() sort.Slice(badPeers, func(i, j int) bool { diff --git a/beacon-chain/p2p/peers/scorers/block_providers.go b/beacon-chain/p2p/peers/scorers/block_providers.go index 649ff57009e0..9840b9c08157 100644 --- a/beacon-chain/p2p/peers/scorers/block_providers.go +++ b/beacon-chain/p2p/peers/scorers/block_providers.go @@ -177,8 +177,8 @@ func (s *BlockProviderScorer) processedBlocksNoLock(pid peer.ID) uint64 { // Block provider scorer cannot guarantee that lower score of a peer is indeed a sign of a bad peer. // Therefore this scorer never marks peers as bad, and relies on scores to probabilistically sort // out low-scorers (see WeightSorted method). -func (*BlockProviderScorer) IsBadPeer(_ peer.ID) bool { - return false +func (*BlockProviderScorer) IsBadPeer(_ peer.ID) error { + return nil } // BadPeers returns the peers that are considered bad. diff --git a/beacon-chain/p2p/peers/scorers/block_providers_test.go b/beacon-chain/p2p/peers/scorers/block_providers_test.go index bcb2c8d45e36..2420b5579e38 100644 --- a/beacon-chain/p2p/peers/scorers/block_providers_test.go +++ b/beacon-chain/p2p/peers/scorers/block_providers_test.go @@ -481,8 +481,8 @@ func TestScorers_BlockProvider_BadPeerMarking(t *testing.T) { }) scorer := peerStatuses.Scorers().BlockProviderScorer() - assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer") + assert.NoError(t, scorer.IsBadPeer("peer1"), "Unexpected status for unregistered peer") scorer.IncrementProcessedBlocks("peer1", 64) - assert.Equal(t, false, scorer.IsBadPeer("peer1")) + assert.NoError(t, scorer.IsBadPeer("peer1")) assert.Equal(t, 0, len(scorer.BadPeers())) } diff --git a/beacon-chain/p2p/peers/scorers/gossip_scorer.go b/beacon-chain/p2p/peers/scorers/gossip_scorer.go index 5482ebde74e4..1adec7b9eb87 100644 --- a/beacon-chain/p2p/peers/scorers/gossip_scorer.go +++ b/beacon-chain/p2p/peers/scorers/gossip_scorer.go @@ -2,6 +2,7 @@ package scorers import ( "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" pbrpc "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) @@ -51,19 +52,24 @@ func (s *GossipScorer) scoreNoLock(pid peer.ID) float64 { } // IsBadPeer states if the peer is to be considered bad. -func (s *GossipScorer) IsBadPeer(pid peer.ID) bool { +func (s *GossipScorer) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() return s.isBadPeerNoLock(pid) } // isBadPeerNoLock is lock-free version of IsBadPeer. -func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) bool { +func (s *GossipScorer) isBadPeerNoLock(pid peer.ID) error { peerData, ok := s.store.PeerData(pid) if !ok { - return false + return nil } - return peerData.GossipScore < gossipThreshold + + if peerData.GossipScore < gossipThreshold { + return errors.Errorf("gossip score below threshold: got %f - threshold %f", peerData.GossipScore, gossipThreshold) + } + + return nil } // BadPeers returns the peers that are considered bad. @@ -73,7 +79,7 @@ func (s *GossipScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go b/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go index 98fccf38d1d6..228ca53f24fb 100644 --- a/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go +++ b/beacon-chain/p2p/peers/scorers/gossip_scorer_test.go @@ -34,7 +34,7 @@ func TestScorers_Gossip_Score(t *testing.T) { }, check: func(scorer *scorers.GossipScorer) { assert.Equal(t, -101.0, scorer.Score("peer1"), "Unexpected score") - assert.Equal(t, true, scorer.IsBadPeer("peer1"), "Unexpected good peer") + assert.NotNil(t, scorer.IsBadPeer("peer1"), "Unexpected good peer") }, }, { @@ -44,7 +44,7 @@ func TestScorers_Gossip_Score(t *testing.T) { }, check: func(scorer *scorers.GossipScorer) { assert.Equal(t, 10.0, scorer.Score("peer1"), "Unexpected score") - assert.Equal(t, false, scorer.IsBadPeer("peer1"), "Unexpected bad peer") + assert.NoError(t, scorer.IsBadPeer("peer1"), "Unexpected bad peer") _, _, topicMap, err := scorer.GossipData("peer1") assert.NoError(t, err) assert.Equal(t, uint64(100), topicMap["a"].TimeInMesh, "incorrect time in mesh") diff --git a/beacon-chain/p2p/peers/scorers/peer_status.go b/beacon-chain/p2p/peers/scorers/peer_status.go index 5153c0c784dc..6003bb4b71c7 100644 --- a/beacon-chain/p2p/peers/scorers/peer_status.go +++ b/beacon-chain/p2p/peers/scorers/peer_status.go @@ -46,7 +46,7 @@ func (s *PeerStatusScorer) Score(pid peer.ID) float64 { // scoreNoLock is a lock-free version of Score. func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { return BadPeerScore } score := float64(0) @@ -67,30 +67,34 @@ func (s *PeerStatusScorer) scoreNoLock(pid peer.ID) float64 { } // IsBadPeer states if the peer is to be considered bad. -func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) bool { +func (s *PeerStatusScorer) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() + return s.isBadPeerNoLock(pid) } // isBadPeerNoLock is lock-free version of IsBadPeer. -func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) bool { +func (s *PeerStatusScorer) isBadPeerNoLock(pid peer.ID) error { peerData, ok := s.store.PeerData(pid) if !ok { - return false + return nil } + // Mark peer as bad, if the latest error is one of the terminal ones. terminalErrs := []error{ p2ptypes.ErrWrongForkDigestVersion, p2ptypes.ErrInvalidFinalizedRoot, p2ptypes.ErrInvalidRequest, } + for _, err := range terminalErrs { if errors.Is(peerData.ChainStateValidationError, err) { - return true + return err } } - return false + + return nil } // BadPeers returns the peers that are considered bad. @@ -100,7 +104,7 @@ func (s *PeerStatusScorer) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.isBadPeerNoLock(pid) { + if s.isBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/peer_status_test.go b/beacon-chain/p2p/peers/scorers/peer_status_test.go index 241749068d60..8700abf563d8 100644 --- a/beacon-chain/p2p/peers/scorers/peer_status_test.go +++ b/beacon-chain/p2p/peers/scorers/peer_status_test.go @@ -140,12 +140,12 @@ func TestScorers_PeerStatus_IsBadPeer(t *testing.T) { ScorerParams: &scorers.Config{}, }) pid := peer.ID("peer1") - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid)) - assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid)) + assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid)) } func TestScorers_PeerStatus_BadPeers(t *testing.T) { @@ -155,22 +155,22 @@ func TestScorers_PeerStatus_BadPeers(t *testing.T) { pid1 := peer.ID("peer1") pid2 := peer.ID("peer2") pid3 := peer.ID("peer3") - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid1)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid2)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid3)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid1)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid2)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid3)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid1, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid2, &pb.Status{}, nil) peerStatuses.Scorers().PeerStatusScorer().SetPeerStatus(pid3, &pb.Status{}, p2ptypes.ErrWrongForkDigestVersion) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid1)) - assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer(pid2)) - assert.Equal(t, false, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer(pid3)) - assert.Equal(t, true, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid1)) + assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid1)) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer(pid2)) + assert.NoError(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid2)) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer(pid3)) + assert.NotNil(t, peerStatuses.Scorers().PeerStatusScorer().IsBadPeer(pid3)) assert.Equal(t, 2, len(peerStatuses.Scorers().PeerStatusScorer().BadPeers())) assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers())) } diff --git a/beacon-chain/p2p/peers/scorers/service.go b/beacon-chain/p2p/peers/scorers/service.go index 4ae91fc499ab..108315882cc5 100644 --- a/beacon-chain/p2p/peers/scorers/service.go +++ b/beacon-chain/p2p/peers/scorers/service.go @@ -6,6 +6,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/v5/config/features" ) @@ -24,7 +25,7 @@ const BadPeerScore = gossipThreshold // Scorer defines minimum set of methods every peer scorer must expose. type Scorer interface { Score(pid peer.ID) float64 - IsBadPeer(pid peer.ID) bool + IsBadPeer(pid peer.ID) error BadPeers() []peer.ID } @@ -124,26 +125,29 @@ func (s *Service) ScoreNoLock(pid peer.ID) float64 { } // IsBadPeer traverses all the scorers to see if any of them classifies peer as bad. -func (s *Service) IsBadPeer(pid peer.ID) bool { +func (s *Service) IsBadPeer(pid peer.ID) error { s.store.RLock() defer s.store.RUnlock() return s.IsBadPeerNoLock(pid) } // IsBadPeerNoLock is a lock-free version of IsBadPeer. -func (s *Service) IsBadPeerNoLock(pid peer.ID) bool { - if s.scorers.badResponsesScorer.isBadPeerNoLock(pid) { - return true +func (s *Service) IsBadPeerNoLock(pid peer.ID) error { + if err := s.scorers.badResponsesScorer.isBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "bad responses scorer") } - if s.scorers.peerStatusScorer.isBadPeerNoLock(pid) { - return true + + if err := s.scorers.peerStatusScorer.isBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "peer status scorer") } + if features.Get().EnablePeerScorer { - if s.scorers.gossipScorer.isBadPeerNoLock(pid) { - return true + if err := s.scorers.gossipScorer.isBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "gossip scorer") } } - return false + + return nil } // BadPeers returns the peers that are considered bad by any of registered scorers. @@ -153,7 +157,7 @@ func (s *Service) BadPeers() []peer.ID { badPeers := make([]peer.ID, 0) for pid := range s.store.Peers() { - if s.IsBadPeerNoLock(pid) { + if s.IsBadPeerNoLock(pid) != nil { badPeers = append(badPeers, pid) } } diff --git a/beacon-chain/p2p/peers/scorers/service_test.go b/beacon-chain/p2p/peers/scorers/service_test.go index f7f7aa9c5079..582e4992b4a9 100644 --- a/beacon-chain/p2p/peers/scorers/service_test.go +++ b/beacon-chain/p2p/peers/scorers/service_test.go @@ -237,7 +237,7 @@ func TestScorers_Service_loop(t *testing.T) { for i := 0; i < s1.Params().Threshold+5; i++ { s1.Increment(pid1) } - assert.Equal(t, true, s1.IsBadPeer(pid1), "Peer should be marked as bad") + assert.NotNil(t, s1.IsBadPeer(pid1), "Peer should be marked as bad") s2.IncrementProcessedBlocks("peer1", 221) assert.Equal(t, uint64(221), s2.ProcessedBlocks("peer1")) @@ -252,7 +252,7 @@ func TestScorers_Service_loop(t *testing.T) { for { select { case <-ticker.C: - if s1.IsBadPeer(pid1) == false && s2.ProcessedBlocks("peer1") == 0 { + if s1.IsBadPeer(pid1) == nil && s2.ProcessedBlocks("peer1") == 0 { return } case <-ctx.Done(): @@ -263,7 +263,7 @@ func TestScorers_Service_loop(t *testing.T) { }() <-done - assert.Equal(t, false, s1.IsBadPeer(pid1), "Peer should not be marked as bad") + assert.NoError(t, s1.IsBadPeer(pid1), "Peer should not be marked as bad") assert.Equal(t, uint64(0), s2.ProcessedBlocks("peer1"), "No blocks are expected") } @@ -278,10 +278,10 @@ func TestScorers_Service_IsBadPeer(t *testing.T) { }, }) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer1")) peerStatuses.Scorers().BadResponsesScorer().Increment("peer1") peerStatuses.Scorers().BadResponsesScorer().Increment("peer1") - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer1")) } func TestScorers_Service_BadPeers(t *testing.T) { @@ -295,16 +295,16 @@ func TestScorers_Service_BadPeers(t *testing.T) { }, }) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer1")) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2")) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer3")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer2")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer3")) assert.Equal(t, 0, len(peerStatuses.Scorers().BadPeers())) for _, pid := range []peer.ID{"peer1", "peer3"} { peerStatuses.Scorers().BadResponsesScorer().Increment(pid) peerStatuses.Scorers().BadResponsesScorer().Increment(pid) } - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer1")) - assert.Equal(t, false, peerStatuses.Scorers().IsBadPeer("peer2")) - assert.Equal(t, true, peerStatuses.Scorers().IsBadPeer("peer3")) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer1")) + assert.NoError(t, peerStatuses.Scorers().IsBadPeer("peer2")) + assert.NotNil(t, peerStatuses.Scorers().IsBadPeer("peer3")) assert.Equal(t, 2, len(peerStatuses.Scorers().BadPeers())) } diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index d73007ffcf37..fb234a15c1a4 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -34,6 +34,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" @@ -343,19 +344,29 @@ func (p *Status) ChainStateLastUpdated(pid peer.ID) (time.Time, error) { // IsBad states if the peer is to be considered bad (by *any* of the registered scorers). // If the peer is unknown this will return `false`, which makes using this function easier than returning an error. -func (p *Status) IsBad(pid peer.ID) bool { +func (p *Status) IsBad(pid peer.ID) error { p.store.RLock() defer p.store.RUnlock() + return p.isBad(pid) } // isBad is the lock-free version of IsBad. -func (p *Status) isBad(pid peer.ID) bool { +func (p *Status) isBad(pid peer.ID) error { // Do not disconnect from trusted peers. if p.store.IsTrustedPeer(pid) { - return false + return nil + } + + if err := p.isfromBadIP(pid); err != nil { + return errors.Wrap(err, "peer is from a bad IP") } - return p.isfromBadIP(pid) || p.scorers.IsBadPeerNoLock(pid) + + if err := p.scorers.IsBadPeerNoLock(pid); err != nil { + return errors.Wrap(err, "is bad peer no lock") + } + + return nil } // NextValidTime gets the earliest possible time it is to contact/dial @@ -600,7 +611,7 @@ func (p *Status) Prune() { return } notBadPeer := func(pid peer.ID) bool { - return !p.isBad(pid) + return p.isBad(pid) == nil } notTrustedPeer := func(pid peer.ID) bool { return !p.isTrustedPeers(pid) @@ -990,24 +1001,28 @@ func (p *Status) isTrustedPeers(pid peer.ID) bool { // this method assumes the store lock is acquired before // executing the method. -func (p *Status) isfromBadIP(pid peer.ID) bool { +func (p *Status) isfromBadIP(pid peer.ID) error { peerData, ok := p.store.PeerData(pid) if !ok { - return false + return nil } + if peerData.Address == nil { - return false + return nil } + ip, err := manet.ToIP(peerData.Address) if err != nil { - return true + return errors.Wrap(err, "to ip") } + if val, ok := p.ipTracker[ip.String()]; ok { if val > CollocationLimit { - return true + return errors.Errorf("colocation limit exceeded: got %d - limit %d", val, CollocationLimit) } } - return false + + return nil } func (p *Status) addIpToTracker(pid peer.ID) { diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index ae57af71f107..4453156438c7 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -347,7 +347,7 @@ func TestPeerBadResponses(t *testing.T) { require.NoError(t, err) } - assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") + assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good") address, err := ma.NewMultiaddr("/ip4/213.202.254.180/tcp/13000") require.NoError(t, err, "Failed to create address") @@ -358,25 +358,25 @@ func TestPeerBadResponses(t *testing.T) { resBadResponses, err := scorer.Count(id) require.NoError(t, err) assert.Equal(t, 0, resBadResponses, "Unexpected bad responses") - assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") + assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good") scorer.Increment(id) resBadResponses, err = scorer.Count(id) require.NoError(t, err) assert.Equal(t, 1, resBadResponses, "Unexpected bad responses") - assert.Equal(t, false, p.IsBad(id), "Peer marked as bad when should be good") + assert.NoError(t, p.IsBad(id), "Peer marked as bad when should be good") scorer.Increment(id) resBadResponses, err = scorer.Count(id) require.NoError(t, err) assert.Equal(t, 2, resBadResponses, "Unexpected bad responses") - assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be") + assert.NotNil(t, p.IsBad(id), "Peer not marked as bad when it should be") scorer.Increment(id) resBadResponses, err = scorer.Count(id) require.NoError(t, err) assert.Equal(t, 3, resBadResponses, "Unexpected bad responses") - assert.Equal(t, true, p.IsBad(id), "Peer not marked as bad when it should be") + assert.NotNil(t, p.IsBad(id), "Peer not marked as bad when it should be") } func TestAddMetaData(t *testing.T) { @@ -574,7 +574,7 @@ func TestPeerIPTracker(t *testing.T) { badPeers = append(badPeers, createPeer(t, p, addr, network.DirUnknown, peerdata.PeerConnectionState(ethpb.ConnectionState_DISCONNECTED))) } for _, pr := range badPeers { - assert.Equal(t, true, p.IsBad(pr), "peer with bad ip is not bad") + assert.NotNil(t, p.IsBad(pr), "peer with bad ip is not bad") } // Add in bad peers, so that our records are trimmed out @@ -587,7 +587,7 @@ func TestPeerIPTracker(t *testing.T) { p.Prune() for _, pr := range badPeers { - assert.Equal(t, false, p.IsBad(pr), "peer with good ip is regarded as bad") + assert.NoError(t, p.IsBad(pr), "peer with good ip is regarded as bad") } } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index c7d7147a6d64..6dc9f222f5e5 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -481,8 +481,8 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error if info.ID == s.host.ID() { return nil } - if s.Peers().IsBad(info.ID) { - return errors.New("refused to connect to bad peer") + if err := s.Peers().IsBad(info.ID); err != nil { + return errors.Wrap(err, "refused to connect to bad peer") } ctx, cancel := context.WithTimeout(ctx, maxDialTimeout) defer cancel() diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index a6caf2989afe..e0f1aa670b97 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -355,7 +355,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra // No suitable peer, exit early. if len(bestPeers) == 0 { - log.WithField("roots", fmt.Sprintf("%#x", roots)).Debug("Send batch root request: No suited peers") + log.WithField("roots", fmt.Sprintf("%#x", roots)).Debug("Send batch root request: No suitable peers") return nil } diff --git a/beacon-chain/sync/rate_limiter_test.go b/beacon-chain/sync/rate_limiter_test.go index c05b0f18ea66..25f8f9472102 100644 --- a/beacon-chain/sync/rate_limiter_test.go +++ b/beacon-chain/sync/rate_limiter_test.go @@ -97,7 +97,7 @@ func TestRateLimiter_ExceedRawCapacity(t *testing.T) { for i := 0; i < defaultBurstLimit; i++ { assert.ErrorContains(t, p2ptypes.ErrRateLimited.Error(), rlimiter.validateRawRpcRequest(stream)) } - assert.Equal(t, true, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer") + assert.NotNil(t, p1.Peers().IsBad(p2.PeerID()), "peer is not marked as a bad peer") require.NoError(t, stream.Close(), "could not close stream") if util.WaitTimeout(&wg, 1*time.Second) { diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index e494b2267e59..804ba732e834 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -164,6 +164,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout) defer cancel() + conn := stream.Conn() + remotePeer := conn.RemotePeer() + // Resetting after closing is a no-op so defer a reset in case something goes wrong. // It's up to the handler to Close the stream (send an EOF) if // it successfully writes a response. We don't blindly call @@ -183,12 +186,13 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { ctx, span := trace.StartSpan(ctx, "sync.rpc") defer span.End() span.SetAttributes(trace.StringAttribute("topic", topic)) - span.SetAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().String())) + span.SetAttributes(trace.StringAttribute("peer", remotePeer.String())) log := log.WithField("peer", stream.Conn().RemotePeer().String()).WithField("topic", string(stream.Protocol())) // Check before hand that peer is valid. - if s.cfg.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { - if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil { + if err := s.cfg.p2p.Peers().IsBad(remotePeer); err != nil { + log.WithError(err).WithField("peer", remotePeer).Debug("Sending goodbye and disconnect") + if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, remotePeer); err != nil { log.WithError(err).Debug("Could not disconnect from peer") } return diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 865195f9806c..e1a92b3ca069 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" ) // beaconBlocksByRangeRPCHandler looks up the request blocks from the database from a given start block. @@ -26,15 +27,23 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa defer cancel() SetRPCStreamDeadlines(stream) + remotePeer := stream.Conn().RemotePeer() + m, ok := msg.(*pb.BeaconBlocksByRangeRequest) if !ok { return errors.New("message is not type *pb.BeaconBlockByRangeRequest") } - log.WithField("startSlot", m.StartSlot).WithField("count", m.Count).Debug("Serving block by range request") + + log.WithFields(logrus.Fields{ + "startSlot": m.StartSlot, + "count": m.Count, + "peer": remotePeer, + }).Debug("Serving block by range request") + rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot()) if err != nil { s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) tracing.AnnotateError(span, err) return err } @@ -50,12 +59,12 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa if err != nil { return err } - remainingBucketCapacity := blockLimiter.Remaining(stream.Conn().RemotePeer().String()) + remainingBucketCapacity := blockLimiter.Remaining(remotePeer.String()) span.SetAttributes( trace.Int64Attribute("start", int64(rp.start)), // lint:ignore uintcast -- This conversion is OK for tracing. trace.Int64Attribute("end", int64(rp.end)), // lint:ignore uintcast -- This conversion is OK for tracing. trace.Int64Attribute("count", int64(m.Count)), - trace.StringAttribute("peer", stream.Conn().RemotePeer().String()), + trace.StringAttribute("peer", remotePeer.String()), trace.Int64Attribute("remaining_capacity", remainingBucketCapacity), ) @@ -82,12 +91,21 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa } rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds())) } + if err := batch.error(); err != nil { - log.WithError(err).Debug("error in BlocksByRange batch") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + log.WithError(err).Debug("Serving block by range request - BlocksByRange batch") + + // If we hit a rate limit, the error response has already been written, and the stream is already closed. + if !errors.Is(err, p2ptypes.ErrRateLimited) { + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + } + tracing.AnnotateError(span, err) return err } + + log.Debug("Serving block by range request") + closeStream(stream, log) return nil } diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range.go b/beacon-chain/sync/rpc_blob_sidecars_by_range.go index 7c60beb3234b..09f282201929 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range.go @@ -99,6 +99,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa } var batch blockBatch + wQuota := params.BeaconConfig().MaxRequestBlobSidecars for batch, ok = batcher.next(ctx, stream); ok; batch, ok = batcher.next(ctx, stream) { batchStart := time.Now() @@ -114,7 +115,12 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa } if err := batch.error(); err != nil { log.WithError(err).Debug("error in BlobSidecarsByRange batch") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + + // If we hit a rate limit, the error response has already been written, and the stream is already closed. + if !errors.Is(err, p2ptypes.ErrRateLimited) { + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + } + tracing.AnnotateError(span, err) return err } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go index 33a1dcd1f638..5868ac30afa5 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go @@ -118,7 +118,12 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i } if err := batch.error(); err != nil { log.WithError(err).Debug("error in DataColumnSidecarsByRange batch") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + + // If we hit a rate limit, the error response has already been written, and the stream is already closed. + if !errors.Is(err, p2ptypes.ErrRateLimited) { + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + } + tracing.AnnotateError(span, err) return err } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index bf6777410264..e9b62b4f0e90 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "slices" "sort" "time" @@ -25,6 +26,17 @@ import ( "github.com/sirupsen/logrus" ) +// uint64MapToSortedSlice produces a sorted uint64 slice from a map. +func uint64MapToSortedSlice(input map[uint64]bool) []uint64 { + output := make([]uint64, 0, len(input)) + for idx := range input { + output = append(output, idx) + } + + slices.Sort[[]uint64](output) + return output +} + func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.dataColumnSidecarByRootRPCHandler") defer span.End() @@ -43,6 +55,8 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } requestedColumnIdents := *ref + requestedColumnsCount := uint64(len(requestedColumnIdents)) + if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) @@ -78,29 +92,35 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return errors.Wrap(err, "custody columns") } + numberOfColumns := params.BeaconConfig().NumberOfColumns + + var ( + custodied interface{} = "all" + requested interface{} = "all" + ) + + custodiedColumnsCount := uint64(len(custodiedColumns)) + + if custodiedColumnsCount != numberOfColumns { + custodied = uint64MapToSortedSlice(custodiedColumns) + } + + if requestedColumnsCount != numberOfColumns { + requested = requestedColumnsList + } + custodiedColumnsList := make([]uint64, 0, len(custodiedColumns)) for column := range custodiedColumns { custodiedColumnsList = append(custodiedColumnsList, column) } // Sort the custodied columns by index. - sort.Slice(custodiedColumnsList, func(i, j int) bool { - return custodiedColumnsList[i] < custodiedColumnsList[j] - }) - - fields := logrus.Fields{ - "requested": requestedColumnsList, - "custodiedCount": len(custodiedColumnsList), - "requestedCount": len(requestedColumnsList), - } - - if uint64(len(custodiedColumnsList)) == params.BeaconConfig().NumberOfColumns { - fields["custodied"] = "all" - } else { - fields["custodied"] = custodiedColumnsList - } + slices.Sort[[]uint64](custodiedColumnsList) - log.WithFields(fields).Debug("Data column sidecar by root request received") + log.WithFields(logrus.Fields{ + "custodied": custodied, + "requested": requested, + }).Debug("Data column sidecar by root request received") // Subscribe to the data column feed. rootIndexChan := make(chan filesystem.RootIndexPair) diff --git a/beacon-chain/sync/rpc_goodbye.go b/beacon-chain/sync/rpc_goodbye.go index 4c27e0b55cf8..03f0789c87cf 100644 --- a/beacon-chain/sync/rpc_goodbye.go +++ b/beacon-chain/sync/rpc_goodbye.go @@ -55,10 +55,7 @@ func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream l // disconnectBadPeer checks whether peer is considered bad by some scorer, and tries to disconnect // the peer, if that is the case. Additionally, disconnection reason is obtained from scorer. -func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) { - if !s.cfg.p2p.Peers().IsBad(id) { - return - } +func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID, badPeerErr error) { err := s.cfg.p2p.Peers().Scorers().ValidationError(id) goodbyeCode := p2ptypes.ErrToGoodbyeCode(err) if err == nil { @@ -67,6 +64,8 @@ func (s *Service) disconnectBadPeer(ctx context.Context, id peer.ID) { if err := s.sendGoodByeAndDisconnect(ctx, goodbyeCode, id); err != nil { log.WithError(err).Debug("Error when disconnecting with bad peer") } + + log.WithError(badPeerErr).WithField("peerID", id).Debug("Initiate peer disconnection") } // A custom goodbye method that is used by our connection handler, in the diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index e688301d66de..0ad9ffd21f33 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -25,7 +25,7 @@ import ( "github.com/sirupsen/logrus" ) -// maintainPeerStatuses by infrequently polling peers for their latest status. +// maintainPeerStatuses maintain peer statuses by polling peers for their latest status twice per epoch. func (s *Service) maintainPeerStatuses() { // Run twice per epoch. interval := time.Duration(params.BeaconConfig().SlotsPerEpoch.Div(2).Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second @@ -43,11 +43,15 @@ func (s *Service) maintainPeerStatuses() { log.WithError(err).Debug("Error when disconnecting with peer") } s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnected) + log.WithFields(logrus.Fields{ + "peer": id, + "reason": "maintain peer statuses - peer is not connected", + }).Debug("Initiate peer disconnection") return } // Disconnect from peers that are considered bad by any of the registered scorers. - if s.cfg.p2p.Peers().IsBad(id) { - s.disconnectBadPeer(s.ctx, id) + if err := s.cfg.p2p.Peers().IsBad(id); err != nil { + s.disconnectBadPeer(s.ctx, id, err) return } // If the status hasn't been updated in the recent interval time. @@ -73,6 +77,11 @@ func (s *Service) maintainPeerStatuses() { if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil { log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer") } + + log.WithFields(logrus.Fields{ + "peer": id, + "reason": "to be pruned", + }).Debug("Initiate peer disconnection") } }) } @@ -169,8 +178,8 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { // If validation fails, validation error is logged, and peer status scorer will mark peer as bad. err = s.validateStatusMessage(ctx, msg) s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetPeerStatus(id, msg, err) - if s.cfg.p2p.Peers().IsBad(id) { - s.disconnectBadPeer(s.ctx, id) + if err := s.cfg.p2p.Peers().IsBad(id); err != nil { + s.disconnectBadPeer(s.ctx, id, err) } return err } diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index 67323129d604..e5945b571c89 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -877,7 +877,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { require.NoError(t, cw.SetClock(startup.NewClock(chain.Genesis, chain.ValidatorsRoot))) - assert.Equal(t, false, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is marked as bad") + assert.NoError(t, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is marked as bad") p1.Connect(p2) if util.WaitTimeout(&wg, time.Second) { @@ -889,7 +889,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { require.NoError(t, err, "Could not obtain peer connection state") assert.Equal(t, peers.PeerDisconnected, connectionState, "Expected peer to be disconnected") - assert.Equal(t, true, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is not marked as bad") + assert.NotNil(t, p1.Peers().Scorers().IsBadPeer(p2.PeerID()), "Peer is not marked as bad") } func TestStatusRPC_ValidGenesisMessage(t *testing.T) { diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 942d8422ceb2..aa0364107349 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -391,7 +391,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, } // Check every slot that there are enough peers for i := uint64(0); i < subnetCount; i++ { - if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { + if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -513,7 +513,7 @@ func (s *Service) subscribeAggregatorSubnet( if _, exists := subscriptions[idx]; !exists { subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) } - if !s.validPeersExist(subnetTopic) { + if !s.enoughPeersAreConnected(subnetTopic) { _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -566,7 +566,7 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped } // Check every slot that there are enough peers for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { - if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { + if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -646,7 +646,7 @@ func (s *Service) subscribeToSyncSubnets( subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) // Check if we have enough peers in the subnet. Skip if we do. - if s.validPeersExist(subnetTopic) { + if s.enoughPeersAreConnected(subnetTopic) { continue } @@ -677,13 +677,13 @@ func (s *Service) subscribeDynamicWithSyncSubnets( genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() // Retrieve the epoch of the fork corresponding to the digest. - _, e, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) + _, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { panic(err) } // Retrieve the base protobuf message. - base := p2p.GossipTopicMappings(topicFormat, e) + base := p2p.GossipTopicMappings(topicFormat, epoch) if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } @@ -740,7 +740,7 @@ func (s *Service) subscribeColumnSubnet( minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet - if !s.validPeersExist(subnetTopic) { + if !s.enoughPeersAreConnected(subnetTopic) { _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, minimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -811,7 +811,7 @@ func (s *Service) subscribeDynamicWithColumnSubnets( func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] subnetTopic := fmt.Sprintf(topic, digest, idx) - if !s.validPeersExist(subnetTopic) { + if !s.enoughPeersAreConnected(subnetTopic) { // perform a search for peers with the desired committee index. _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { @@ -835,8 +835,8 @@ func (s *Service) unSubscribeFromTopic(topic string) { } } -// find if we have peers who are subscribed to the same subnet -func (s *Service) validPeersExist(subnetTopic string) bool { +// enoughPeersAreConnected checks if we have enough peers which are subscribed to the same subnet. +func (s *Service) enoughPeersAreConnected(subnetTopic string) bool { topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() threshold := flags.Get().MinimumPeersPerSubnet diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go index 6f4d4989589d..110640222db8 100644 --- a/beacon-chain/sync/validate_data_column.go +++ b/beacon-chain/sync/validate_data_column.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v5/config/features" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" @@ -18,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/logging" prysmTime "github.com/prysmaticlabs/prysm/v5/time" "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" ) // https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub @@ -58,6 +60,19 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs return pubsub.ValidationReject, errors.Wrap(err, "roDataColumn conversion failure") } + // Voluntary ignore messages (for debugging purposes). + dataColumnsIgnoreSlotMultiple := features.Get().DataColumnsIgnoreSlotMultiple + blockSlot := uint64(ds.SignedBlockHeader.Header.Slot) + + if dataColumnsIgnoreSlotMultiple != 0 && blockSlot%dataColumnsIgnoreSlotMultiple == 0 { + log.WithFields(logrus.Fields{ + "slot": blockSlot, + "topic": msg.Topic, + }).Warning("Voluntary ignore data column sidecar gossip") + + return pubsub.ValidationIgnore, err + } + verifier := s.newColumnVerifier(ds, verification.GossipColumnSidecarRequirements) if err := verifier.DataColumnIndexInBounds(); err != nil { @@ -130,13 +145,20 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs msg.ValidatorData = verifiedRODataColumn - fields := logging.DataColumnFields(ds) sinceSlotStartTime := receivedTime.Sub(startTime) validationTime := s.cfg.clock.Now().Sub(receivedTime) - fields["sinceSlotStartTime"] = sinceSlotStartTime - fields["validationTime"] = validationTime - log.WithFields(fields).Debug("Accepted data column sidecar gossip") + peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid) + log. + WithFields(logging.DataColumnFields(ds)). + WithFields(logrus.Fields{ + "sinceSlotStartTime": sinceSlotStartTime, + "validationTime": validationTime, + "peer": pid[len(pid)-6:], + "peerGossipScore": peerGossipScore, + }). + Debug("Accepted data column sidecar gossip") + return pubsub.ValidationAccept, nil } diff --git a/config/features/config.go b/config/features/config.go index a8ec0a97f6b3..5bcc180c1c2a 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -84,9 +84,12 @@ type Flags struct { // changed on disk. This feature is for advanced use cases only. KeystoreImportDebounceInterval time.Duration - // DataColumnsWithholdCount specifies the likelihood of withholding a data column sidecar when proposing a block (percentage) + // DataColumnsWithholdCount specifies the number of data columns that should be withheld when proposing a block. DataColumnsWithholdCount uint64 + // DataColumnsIgnoreSlotMultiple specifies the multiple of slot number where data columns should be ignored. + DataColumnsIgnoreSlotMultiple uint64 + // AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice. AggregateIntervals [3]time.Duration } @@ -276,6 +279,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error { cfg.DataColumnsWithholdCount = ctx.Uint64(DataColumnsWithholdCount.Name) } + if ctx.IsSet(DataColumnsIgnoreSlotMultiple.Name) { + logEnabled(DataColumnsIgnoreSlotMultiple) + cfg.DataColumnsIgnoreSlotMultiple = ctx.Uint64(DataColumnsIgnoreSlotMultiple.Name) + } + cfg.AggregateIntervals = [3]time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value} Init(cfg) return nil diff --git a/config/features/flags.go b/config/features/flags.go index b6dd6c6449b5..1253a9aedf36 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -185,6 +185,13 @@ var ( Value: 0, Hidden: true, } + // DataColumnsWithholdCount is a flag for withholding data columns when proposing a block. + DataColumnsIgnoreSlotMultiple = &cli.Uint64Flag{ + Name: "data-columns-ignore-slot-multiple", + Usage: "Ignore all data columns for slots that are a multiple of this value. DO NOT USE IN PRODUCTION.", + Value: 0, + Hidden: true, + } ) // devModeFlags holds list of flags that are set when development mode is on. @@ -245,6 +252,7 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c EnableDiscoveryReboot, EnablePeerDAS, DataColumnsWithholdCount, + DataColumnsIgnoreSlotMultiple, }...)...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E. diff --git a/runtime/logging/data_column.go b/runtime/logging/data_column.go index 983aa329251b..7aa71153001b 100644 --- a/runtime/logging/data_column.go +++ b/runtime/logging/data_column.go @@ -10,18 +10,15 @@ import ( // DataColumnFields extracts a standard set of fields from a DataColumnSidecar into a logrus.Fields struct // which can be passed to log.WithFields. func DataColumnFields(column blocks.RODataColumn) logrus.Fields { - kzgCommitmentsShort := make([][]byte, 0, len(column.KzgCommitments)) - for _, kzgCommitment := range column.KzgCommitments { - kzgCommitmentsShort = append(kzgCommitmentsShort, kzgCommitment[:3]) - } + kzgCommitmentCount := len(column.KzgCommitments) return logrus.Fields{ - "slot": column.Slot(), - "propIdx": column.ProposerIndex(), - "blockRoot": fmt.Sprintf("%#x", column.BlockRoot())[:8], - "parentRoot": fmt.Sprintf("%#x", column.ParentRoot())[:8], - "kzgCommitments": fmt.Sprintf("%#x", kzgCommitmentsShort), - "colIdx": column.ColumnIndex, + "slot": column.Slot(), + "propIdx": column.ProposerIndex(), + "blockRoot": fmt.Sprintf("%#x", column.BlockRoot())[:8], + "parentRoot": fmt.Sprintf("%#x", column.ParentRoot())[:8], + "kzgCommitmentCount": kzgCommitmentCount, + "colIdx": column.ColumnIndex, } } diff --git a/validator/client/propose.go b/validator/client/propose.go index 3cf5a11fd1dc..a62d73ee814f 100644 --- a/validator/client/propose.go +++ b/validator/client/propose.go @@ -195,7 +195,7 @@ func logProposedBlock(log *logrus.Entry, blk interfaces.SignedBeaconBlock, blkRo log = log.WithFields(logrus.Fields{ "payloadHash": fmt.Sprintf("%#x", bytesutil.Trunc(p.BlockHash())), "parentHash": fmt.Sprintf("%#x", bytesutil.Trunc(p.ParentHash())), - "blockNumber": p.BlockNumber, + "blockNumber": p.BlockNumber(), }) if !blk.IsBlinded() { txs, err := p.Transactions()