Skip to content

Commit

Permalink
Peerdas: Several updates (#14459)
Browse files Browse the repository at this point in the history
* `validateDataColumn`: Refactor logging.

* `dataColumnSidecarByRootRPCHandler`: Improve logging.

* `isDataAvailable`: Improve logging.

* Add hidden debug flag: `--data-columns-reject-slot-multiple`.

* Add more logs about peer disconnection.

* `validPeersExist` --> `enoughPeersAreConnected`

* `beaconBlocksByRangeRPCHandler`: Add remote Peer ID in logs.

* Stop calling twice `writeErrorResponseToStream` in case of rate limit.
  • Loading branch information
nalepae committed Oct 24, 2024
1 parent 6b1b04e commit 5ca87d5
Show file tree
Hide file tree
Showing 34 changed files with 400 additions and 215 deletions.
15 changes: 9 additions & 6 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si
"root": fmt.Sprintf("%#x", root),
"columnsExpected": expected,
"columnsWaiting": missing,
}).Error("Some data columns are still unavailable at slot end.")
}).Error("Some data columns are still unavailable at slot end")
})

defer nst.Stop()
Expand Down Expand Up @@ -779,12 +779,15 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si
return nil
}
case <-ctx.Done():
missingIndexes := make([]uint64, 0, len(missingMap))
for val := range missingMap {
copiedVal := val
missingIndexes = append(missingIndexes, copiedVal)
var missingIndices interface{} = "all"
numberOfColumns := params.BeaconConfig().NumberOfColumns
missingIndicesCount := uint64(len(missingMap))

if missingIndicesCount < numberOfColumns {
missingIndices = uint64MapToSortedSlice(missingMap)
}
return errors.Wrapf(ctx.Err(), "context deadline waiting for data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndexes)

return errors.Wrapf(ctx.Err(), "context deadline waiting for data column sidecars slot: %d, BlockRoot: %#x, missing %v", block.Slot(), root, missingIndices)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/connection_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (*Service) InterceptPeerDial(_ peer.ID) (allow bool) {
// multiaddr for the given peer.
func (s *Service) InterceptAddrDial(pid peer.ID, m multiaddr.Multiaddr) (allow bool) {
// Disallow bad peers from dialing in.
if s.peers.IsBad(pid) {
if s.peers.IsBad(pid) != nil {
return false
}
return filterConnections(s.addrFilter, m)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (s *Service) filterPeer(node *enode.Node) bool {
}

// Ignore bad nodes.
if s.peers.IsBad(peerData.ID) {
if s.peers.IsBad(peerData.ID) != nil {
return false
}

Expand Down
141 changes: 95 additions & 46 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package p2p

import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
Expand All @@ -22,7 +22,57 @@ const (
)

func peerMultiaddrString(conn network.Conn) string {
return fmt.Sprintf("%s/p2p/%s", conn.RemoteMultiaddr().String(), conn.RemotePeer().String())
remoteMultiaddr := conn.RemoteMultiaddr().String()
remotePeerID := conn.RemotePeer().String()
return fmt.Sprintf("%s/p2p/%s", remoteMultiaddr, remotePeerID)
}

func (s *Service) connectToPeer(conn network.Conn) {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Initiate peer connection")
}

func (s *Service) disconnectFromPeer(
conn network.Conn,
goodByeFunc func(ctx context.Context, id peer.ID) error,
badPeerErr error,
) {
// Get the remote peer ID.
remotePeerID := conn.RemotePeer()

// Get the direction of the connection.
direction := conn.Stat().Direction.String()

// Get the remote peer multiaddr.
remotePeerMultiAddr := peerMultiaddrString(conn)

// Set the peer to disconnecting state.
s.peers.SetConnectionState(remotePeerID, peers.PeerDisconnecting)

// Only attempt a goodbye if we are still connected to the peer.
if s.host.Network().Connectedness(remotePeerID) == network.Connected {
if err := goodByeFunc(context.TODO(), remotePeerID); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}

// Get the remaining active peers.
activePeerCount := len(s.peers.Active())
log.
WithError(badPeerErr).
WithFields(logrus.Fields{
"multiaddr": remotePeerMultiAddr,
"direction": direction,
"remainingActivePeers": activePeerCount,
}).
Debug("Initiate peer disconnection")

s.peers.SetConnectionState(remotePeerID, peers.PeerDisconnected)
}

// AddConnectionHandler adds a callback function which handles the connection with a
Expand Down Expand Up @@ -59,16 +109,7 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
remotePeer := conn.RemotePeer()
disconnectFromPeer := func() {
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnecting)
// Only attempt a goodbye if we are still connected to the peer.
if s.host.Network().Connectedness(remotePeer) == network.Connected {
if err := goodByeFunc(context.TODO(), remotePeer); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}
s.peers.SetConnectionState(remotePeer, peers.PeerDisconnected)
}

// Connection handler must be non-blocking as part of libp2p design.
go func() {
if peerHandshaking(remotePeer) {
Expand All @@ -77,30 +118,22 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
return
}
defer peerFinished(remotePeer)

// Handle the various pre-existing conditions that will result in us not handshaking.
peerConnectionState, err := s.peers.ConnectionState(remotePeer)
if err == nil && (peerConnectionState == peers.PeerConnected || peerConnectionState == peers.PeerConnecting) {
log.WithField("currentState", peerConnectionState).WithField("reason", "already active").Trace("Ignoring connection request")
return
}

s.peers.Add(nil /* ENR */, remotePeer, conn.RemoteMultiaddr(), conn.Stat().Direction)

// Defensive check in the event we still get a bad peer.
if s.peers.IsBad(remotePeer) {
log.WithField("reason", "bad peer").Trace("Ignoring connection request")
disconnectFromPeer()
if err := s.peers.IsBad(remotePeer); err != nil {
s.disconnectFromPeer(conn, goodByeFunc, err)
return
}

validPeerConnection := func() {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("New peer connection")
}

// Do not perform handshake on inbound dials.
if conn.Stat().Direction == network.DirInbound {
_, err := s.peers.ChainState(remotePeer)
Expand All @@ -118,67 +151,83 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
// If peer hasn't sent a status request, we disconnect with them
if _, err := s.peers.ChainState(remotePeer); errors.Is(err, peerdata.ErrPeerUnknown) || errors.Is(err, peerdata.ErrNoPeerStatus) {
statusMessageMissing.Inc()
disconnectFromPeer()
s.disconnectFromPeer(conn, goodByeFunc, errors.Wrap(err, "chain state"))
return
}

if peerExists {
updated, err := s.peers.ChainStateLastUpdated(remotePeer)
if err != nil {
disconnectFromPeer()
s.disconnectFromPeer(conn, goodByeFunc, errors.Wrap(err, "chain state last updated"))
return
}
// exit if we don't receive any current status messages from
// peer.
if updated.IsZero() || !updated.After(currentTime) {
disconnectFromPeer()

// Exit if we don't receive any current status messages from peer.
if updated.IsZero() {
s.disconnectFromPeer(conn, goodByeFunc, errors.New("is zero"))
return
}

if !updated.After(currentTime) {
s.disconnectFromPeer(conn, goodByeFunc, errors.New("did not update"))
return
}
}
validPeerConnection()

s.connectToPeer(conn)
return
}

s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
if err := reqFunc(context.TODO(), conn.RemotePeer()); err != nil && !errors.Is(err, io.EOF) {
log.WithError(err).Trace("Handshake failed")
disconnectFromPeer()
s.disconnectFromPeer(conn, goodByeFunc, err)
return
}
validPeerConnection()

s.connectToPeer(conn)
}()
},
})
}

// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
// AddDisconnectionHandler disconnects from peers. It handles updating the peer status.
// This also calls the handler responsible for maintaining other parts of the sync or p2p system.
func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
log := log.WithField("multiAddr", peerMultiaddrString(conn))
remotePeerMultiAddr := peerMultiaddrString(conn)
peerID := conn.RemotePeer()
direction := conn.Stat().Direction.String()

log := log.WithFields(logrus.Fields{
"multiAddr": remotePeerMultiAddr,
"direction": direction,
})

// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
// Exit early if we are still connected to the peer.
if net.Connectedness(conn.RemotePeer()) == network.Connected {
if net.Connectedness(peerID) == network.Connected {
return
}
priorState, err := s.peers.ConnectionState(conn.RemotePeer())

priorState, err := s.peers.ConnectionState(peerID)
if err != nil {
// Can happen if the peer has already disconnected, so...
priorState = peers.PeerDisconnected
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting)

s.peers.SetConnectionState(peerID, peers.PeerDisconnecting)
if err := handler(context.TODO(), conn.RemotePeer()); err != nil {
log.WithError(err).Error("Disconnect handler failed")
}
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)

s.peers.SetConnectionState(peerID, peers.PeerDisconnected)

// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer disconnected")
activePeersCount := len(s.peers.Active())
log.WithField("remainingActivePeers", activePeersCount).Debug("Peer disconnected")
}
}()
},
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/peers/scorers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//crypto/rand:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)

Expand Down
19 changes: 13 additions & 6 deletions beacon-chain/p2p/peers/scorers/bad_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata"
)

Expand Down Expand Up @@ -61,7 +62,7 @@ func (s *BadResponsesScorer) Score(pid peer.ID) float64 {

// scoreNoLock is a lock-free version of Score.
func (s *BadResponsesScorer) scoreNoLock(pid peer.ID) float64 {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
return BadPeerScore
}
score := float64(0)
Expand Down Expand Up @@ -116,18 +117,24 @@ func (s *BadResponsesScorer) Increment(pid peer.ID) {

// IsBadPeer states if the peer is to be considered bad.
// If the peer is unknown this will return `false`, which makes using this function easier than returning an error.
func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) bool {
func (s *BadResponsesScorer) IsBadPeer(pid peer.ID) error {
s.store.RLock()
defer s.store.RUnlock()

return s.isBadPeerNoLock(pid)
}

// isBadPeerNoLock is lock-free version of IsBadPeer.
func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) bool {
func (s *BadResponsesScorer) isBadPeerNoLock(pid peer.ID) error {
if peerData, ok := s.store.PeerData(pid); ok {
return peerData.BadResponses >= s.config.Threshold
if peerData.BadResponses >= s.config.Threshold {
return errors.Errorf("peer exceeded bad responses threshold: got %d, threshold %d", peerData.BadResponses, s.config.Threshold)
}

return nil
}
return false

return nil
}

// BadPeers returns the peers that are considered bad.
Expand All @@ -137,7 +144,7 @@ func (s *BadResponsesScorer) BadPeers() []peer.ID {

badPeers := make([]peer.ID, 0)
for pid := range s.store.Peers() {
if s.isBadPeerNoLock(pid) {
if s.isBadPeerNoLock(pid) != nil {
badPeers = append(badPeers, pid)
}
}
Expand Down
Loading

0 comments on commit 5ca87d5

Please sign in to comment.