Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into separate-node
Browse files Browse the repository at this point in the history
  • Loading branch information
dean65 committed Apr 28, 2022
2 parents 5f22105 + 15bc254 commit 4771d86
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
16 changes: 11 additions & 5 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer h.removePeer(peer.ID())
defer h.unregisterPeer(peer.ID())

p := h.peers.peer(peer.ID())
if p == nil {
Expand Down Expand Up @@ -418,9 +418,17 @@ func (h *handler) runTrustExtension(peer *trust.Peer, handler trust.Handler) err
return handler(peer)
}

// removePeer unregisters a peer from the downloader and fetchers, removes it from
// the set of tracked peers and closes the network connection to it.
// removePeer requests disconnection of a peer.
func (h *handler) removePeer(id string) {
peer := h.peers.peer(id)
if peer != nil {
// Hard disconnect at the networking layer. Handler will get an EOF and terminate the peer. defer unregisterPeer will do the cleanup task after then.
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}
}

// unregisterPeer removes a peer from the downloader, fetchers and main peer set.
func (h *handler) unregisterPeer(id string) {
// Create a custom logger to avoid printing the entire id
var logger log.Logger
if len(id) < 16 {
Expand Down Expand Up @@ -448,8 +456,6 @@ func (h *handler) removePeer(id string) {
if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
// Hard disconnect at the networking layer
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}

func (h *handler) Start(maxPeers int) {
Expand Down
17 changes: 12 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,20 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
defer p2pLocal.Close()
defer p2pRemote.Close()

local := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{1}, "", nil), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{2}, "", nil), p2pRemote, handler.txpool)
local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool)
defer local.Close()
defer remote.Close()

go handler.handler.runEthPeer(local, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
handlerDone := make(chan struct{})
go func() {
defer close(handlerDone)
handler.handler.runEthPeer(local, func(peer *eth.Peer) error {
err := eth.Handle((*ethHandler)(handler.handler), peer)
return err
})
}()

// Run the handshake locally to avoid spinning up a remote handler
var (
genesis = handler.chain.Genesis()
Expand Down Expand Up @@ -685,6 +691,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo

// Verify that the remote peer is maintained or dropped
if drop {
<-handlerDone
if peers := handler.handler.peers.len(); peers != 0 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
}
Expand Down
16 changes: 15 additions & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ type Peer struct {
disc chan DiscReason

// events receives message send / receive events if set
events *event.Feed
events *event.Feed
testPipe *MsgPipeRW // for testing
}

// NewPeer returns a peer for testing purposes.
Expand All @@ -129,6 +130,15 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer {
return peer
}

// NewPeerPipe creates a peer for testing purposes.
// The message pipe given as the last parameter is closed when
// Disconnect is called on the peer.
func NewPeerPipe(id enode.ID, name string, caps []Cap, pipe *MsgPipeRW) *Peer {
p := NewPeer(id, name, caps)
p.testPipe = pipe
return p
}

// NewPeerWithProtocols returns a peer for testing purposes.
func NewPeerWithProtocols(id enode.ID, protocols []Protocol, name string, caps []Cap) *Peer {
pipe, _ := net.Pipe()
Expand Down Expand Up @@ -196,6 +206,10 @@ func (p *Peer) LocalAddr() net.Addr {
// Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) {
if p.testPipe != nil {
p.testPipe.Close()
}

select {
case p.disc <- reason:
case <-p.closed:
Expand Down

0 comments on commit 4771d86

Please sign in to comment.