Skip to content

Commit

Permalink
fix logic issue: handlers.removePeer() is called twice. (#856)
Browse files Browse the repository at this point in the history
* fix logic issue: handlers.removePeer() is called twice.

There is a logic issue which cause "Ethereum peer removal failed, err=peer not registered" occur quite often.

handler.runEthPeer set up a defer removePeer(). This is always called after a peer is disconnected.
However removePeer is also called by mulitple functions like downloader/fetcher.  After those kind of functions removePeer(), peer handler executes defer removePeer(). This makes removePeer() happened twice, and this is the reason we often see "Ethereum peer removal failed, err=peer not registered".

To solve this, removePeer only needs to hard Disconnect peer from networking layer. Then defer unregisterPeer() will do the cleanup task after then.

* fix: modify test function for close testing.

reference from go-thereum.

Co-authored-by: zjubfd <296179868@qq.com>
  • Loading branch information
Jolly23 and unclezoro authored Apr 24, 2022
1 parent 1aeadc1 commit 15bc254
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
16 changes: 11 additions & 5 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer h.removePeer(peer.ID())
defer h.unregisterPeer(peer.ID())

p := h.peers.peer(peer.ID())
if p == nil {
Expand Down Expand Up @@ -395,9 +395,17 @@ func (h *handler) runDiffExtension(peer *diff.Peer, handler diff.Handler) error
return handler(peer)
}

// removePeer unregisters a peer from the downloader and fetchers, removes it from
// the set of tracked peers and closes the network connection to it.
// removePeer requests disconnection of a peer.
func (h *handler) removePeer(id string) {
peer := h.peers.peer(id)
if peer != nil {
// Hard disconnect at the networking layer. Handler will get an EOF and terminate the peer. defer unregisterPeer will do the cleanup task after then.
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}
}

// unregisterPeer removes a peer from the downloader, fetchers and main peer set.
func (h *handler) unregisterPeer(id string) {
// Create a custom logger to avoid printing the entire id
var logger log.Logger
if len(id) < 16 {
Expand Down Expand Up @@ -425,8 +433,6 @@ func (h *handler) removePeer(id string) {
if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
// Hard disconnect at the networking layer
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}

func (h *handler) Start(maxPeers int) {
Expand Down
17 changes: 12 additions & 5 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,20 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
defer p2pLocal.Close()
defer p2pRemote.Close()

local := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{1}, "", nil), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeer(enode.ID{2}, "", nil), p2pRemote, handler.txpool)
local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool)
remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool)
defer local.Close()
defer remote.Close()

go handler.handler.runEthPeer(local, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
handlerDone := make(chan struct{})
go func() {
defer close(handlerDone)
handler.handler.runEthPeer(local, func(peer *eth.Peer) error {
err := eth.Handle((*ethHandler)(handler.handler), peer)
return err
})
}()

// Run the handshake locally to avoid spinning up a remote handler
var (
genesis = handler.chain.Genesis()
Expand Down Expand Up @@ -685,6 +691,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo

// Verify that the remote peer is maintained or dropped
if drop {
<-handlerDone
if peers := handler.handler.peers.len(); peers != 0 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
}
Expand Down
16 changes: 15 additions & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ type Peer struct {
disc chan DiscReason

// events receives message send / receive events if set
events *event.Feed
events *event.Feed
testPipe *MsgPipeRW // for testing
}

// NewPeer returns a peer for testing purposes.
Expand All @@ -129,6 +130,15 @@ func NewPeer(id enode.ID, name string, caps []Cap) *Peer {
return peer
}

// NewPeerPipe creates a peer for testing purposes.
// The message pipe given as the last parameter is closed when
// Disconnect is called on the peer.
func NewPeerPipe(id enode.ID, name string, caps []Cap, pipe *MsgPipeRW) *Peer {
p := NewPeer(id, name, caps)
p.testPipe = pipe
return p
}

// NewPeerWithProtocols returns a peer for testing purposes.
func NewPeerWithProtocols(id enode.ID, protocols []Protocol, name string, caps []Cap) *Peer {
pipe, _ := net.Pipe()
Expand Down Expand Up @@ -196,6 +206,10 @@ func (p *Peer) LocalAddr() net.Addr {
// Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) {
if p.testPipe != nil {
p.testPipe.Close()
}

select {
case p.disc <- reason:
case <-p.closed:
Expand Down

0 comments on commit 15bc254

Please sign in to comment.