Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
[FIX] Bootnode re-discovery issue (#775)
Browse files Browse the repository at this point in the history
* added TemporaryDialPeer method
* added additional logging
* fixed 'failed to dial' debug output
* added missing method to MockNetworkingServer
* added delegate method to test function
  • Loading branch information
ZeljkoBenovic authored Oct 11, 2022
1 parent a7c34d5 commit 9f1ce9b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 18 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ require (
lukechampine.com/blake3 v1.1.7 // indirect
)

require (
github.com/0xPolygon/go-ibft v0.0.0-20220810095021-e43142f8d267
go.uber.org/atomic v1.10.0
)

require (
github.com/0xPolygon/go-ibft v0.0.0-20220810095021-e43142f8d267
go.uber.org/atomic v1.10.0
Expand Down
15 changes: 12 additions & 3 deletions network/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type networkingServer interface {
// RemoveTemporaryDial removes a peer from the temporary dial map
RemoveTemporaryDial(peerID peer.ID)

// TemporaryDialPeer dials the peer temporarily
TemporaryDialPeer(peerAddrInfo *peer.AddrInfo)

// CONNECTION INFORMATION //

// HasFreeConnectionSlot checks if there is an available connection slot for the set direction [Thread safe]
Expand Down Expand Up @@ -300,6 +303,7 @@ func (d *DiscoveryService) regularPeerDiscovery() {
return
}

d.logger.Debug("running regular peer discovery", "peer", peerID.String())
// Try to discover the peers connected to the reference peer
if err := d.attemptToFindPeers(*peerID); err != nil {
d.logger.Error(
Expand All @@ -312,7 +316,7 @@ func (d *DiscoveryService) regularPeerDiscovery() {
}
}

// bootnodeDiscovery queries a random (unconnected) bootnode for new peers
// bootnodePeerDiscovery queries a random (unconnected) bootnode for new peers
// and adds them to the routing table
func (d *DiscoveryService) bootnodePeerDiscovery() {
if !d.baseServer.HasFreeConnectionSlot(network.DirOutbound) {
Expand All @@ -331,7 +335,6 @@ func (d *DiscoveryService) bootnodePeerDiscovery() {
// Get a random unconnected bootnode from the bootnode set
bootnode = d.baseServer.GetRandomBootnode()
if bootnode == nil {
// No bootnodes available
return
}

Expand Down Expand Up @@ -361,10 +364,16 @@ func (d *DiscoveryService) bootnodePeerDiscovery() {
}
}()

// Make sure we are peered with a bootnode
d.baseServer.TemporaryDialPeer(bootnode)

// Find peers from the referenced bootnode
foundNodes, err := d.findPeersCall(bootnode.ID, true)
if err != nil {
d.logger.Error("Unable to execute bootnode peer discovery, %w", err)
d.logger.Error("Unable to execute bootnode peer discovery",
"bootnode", bootnode.ID.String(),
"err", err.Error(),
)

return
}
Expand Down
23 changes: 15 additions & 8 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const (

DefaultLibp2pPort int = 1478

MinimumPeerConnections int64 = 1
MinimumBootNodes int = 1
MinimumPeerConnections int64 = 1
)

var (
Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *Server) Start() error {
}

go s.runDial()
go s.checkPeerConnections()
go s.keepAliveMinimumPeerConnections()

// watch for disconnected peers
s.host.Network().Notify(&network.NotifyBundle{
Expand Down Expand Up @@ -318,8 +318,9 @@ func (s *Server) setupBootnodes() error {
return nil
}

// checkPeerCount will attempt to make new connections if the active peer count is lesser than the specified limit.
func (s *Server) checkPeerConnections() {
// keepAliveMinimumPeerConnections will attempt to make new connections
// if the active peer count is lesser than the specified limit.
func (s *Server) keepAliveMinimumPeerConnections() {
for {
select {
case <-time.After(10 * time.Second):
Expand All @@ -329,10 +330,16 @@ func (s *Server) checkPeerConnections() {

if s.numPeers() < MinimumPeerConnections {
if s.config.NoDiscover || !s.bootnodes.hasBootnodes() {
// TODO: dial peers from the peerstore
// dial unconnected peer
randPeer := s.GetRandomPeer()
if randPeer != nil && !s.IsConnected(*randPeer) {
s.addToDialQueue(s.GetPeerInfo(*randPeer), common.PriorityRandomDial)
}
} else {
randomNode := s.GetRandomBootnode()
s.addToDialQueue(randomNode, common.PriorityRandomDial)
// dial random unconnected bootnode
if randomNode := s.GetRandomBootnode(); randomNode != nil {
s.addToDialQueue(randomNode, common.PriorityRandomDial)
}
}
}
}
Expand Down Expand Up @@ -397,7 +404,7 @@ func (s *Server) runDial() {
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
if err := s.host.Connect(context.Background(), *peerInfo); err != nil {
s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err)
s.logger.Debug("failed to dial", "addr", peerInfo.String(), "err", err.Error())

s.emitEvent(peerInfo.ID, peerEvent.PeerFailedToConnect)
}
Expand Down
15 changes: 8 additions & 7 deletions network/server_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package network

import (
"crypto/rand"
"errors"
"fmt"
"math/big"
"time"
Expand All @@ -17,10 +16,6 @@ import (
rawGrpc "google.golang.org/grpc"
)

var (
errPeerDisconnected = errors.New("peer disconnected before the discovery client was initialized")
)

// GetRandomBootnode fetches a random bootnode that's currently
// NOT connected, if any
func (s *Server) GetRandomBootnode() *peer.AddrInfo {
Expand Down Expand Up @@ -69,7 +64,8 @@ func (s *Server) NewDiscoveryClient(peerID peer.ID) (proto.DiscoveryClient, erro
// Check if there is a peer connection at this point in time,
// as there might have been a disconnection previously
if !s.IsConnected(peerID) && !isTemporaryDial {
return nil, errPeerDisconnected
return nil, fmt.Errorf("could not initialize new discovery client - peer [%s] not connected",
peerID.String())
}

// Check if there is an active stream connection already
Expand All @@ -93,7 +89,7 @@ func (s *Server) NewDiscoveryClient(peerID peer.ID) (proto.DiscoveryClient, erro
return proto.NewDiscoveryClient(protoStream), nil
}

// saveProtocolStream saves the protocol stream to the peer
// SaveProtocolStream saves the protocol stream to the peer
// protocol stream reference [Thread safe]
func (s *Server) SaveProtocolStream(
protocol string,
Expand Down Expand Up @@ -246,6 +242,11 @@ func (s *Server) setupDiscovery() error {
return nil
}

func (s *Server) TemporaryDialPeer(peerAddrInfo *peer.AddrInfo) {
s.logger.Debug("creating new temporary dial to peer", "peer", peerAddrInfo.ID)
s.addToDialQueue(peerAddrInfo, common.PriorityRandomDial)
}

// registerDiscoveryService registers the discovery protocol to be available
func (s *Server) registerDiscoveryService(discovery *discovery.DiscoveryService) {
grpcStream := grpc.NewGrpcStream()
Expand Down
12 changes: 12 additions & 0 deletions network/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type MockNetworkingServer struct {
getRandomPeerFn getRandomPeerDelegate
fetchAndSetTemporaryDialFn fetchAndSetTemporaryDialDelegate
removeTemporaryDialFn removeTemporaryDialDelegate
temporaryDialPeerFn temporaryDialPeerDelegate
}

func NewMockNetworkingServer() *MockNetworkingServer {
Expand Down Expand Up @@ -85,6 +86,17 @@ type getPeerInfoDelegate func(peer.ID) *peer.AddrInfo
type getRandomPeerDelegate func() *peer.ID
type fetchAndSetTemporaryDialDelegate func(peer.ID, bool) bool
type removeTemporaryDialDelegate func(peer.ID)
type temporaryDialPeerDelegate func(peerAddrInfo *peer.AddrInfo)

func (m *MockNetworkingServer) TemporaryDialPeer(peerAddrInfo *peer.AddrInfo) {
if m.temporaryDialPeerFn != nil {
m.temporaryDialPeerFn(peerAddrInfo)
}
}

func (m *MockNetworkingServer) HookTemporaryDialPeer(fn temporaryDialPeerDelegate) {
m.temporaryDialPeerFn = fn
}

func (m *MockNetworkingServer) NewIdentityClient(peerID peer.ID) (proto.IdentityClient, error) {
if m.newIdentityClientFn != nil {
Expand Down

0 comments on commit 9f1ce9b

Please sign in to comment.