Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: solve potential deadlock for bsc p2p protocol handshake #1479

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions eth/handler_bsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@ func (h *bscHandler) Chain() *core.BlockChain { return h.chain }

// RunPeer is invoked when a peer joins on the `bsc` protocol.
func (h *bscHandler) RunPeer(peer *bsc.Peer, hand bsc.Handler) error {
if err := peer.Handshake(); err != nil {
// ensure that waitBscExtension receives the exit signal normally
// otherwise, can't graceful shutdown
ps := h.peers
id := peer.ID()

// Ensure nobody can double connect
ps.lock.Lock()
if wait, ok := ps.bscWait[id]; ok {
delete(ps.bscWait, id)
wait <- peer
}
ps.lock.Unlock()
return err
}
return (*handler)(h).runBscExtension(peer, hand)
}

Expand Down
47 changes: 23 additions & 24 deletions eth/handler_bsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,32 @@ func testSendVotes(t *testing.T, protocol uint) {
defer localBsc.Close()
defer remoteBsc.Close()

go func(p *bsc.Peer) {
(*bscHandler)(handler.handler).RunPeer(p, func(peer *bsc.Peer) error {
return bsc.Handle((*bscHandler)(handler.handler), peer)
})
}(localBsc)

time.Sleep(200 * time.Millisecond)
remoteBsc.Handshake()

time.Sleep(200 * time.Millisecond)
go func(p *eth.Peer) {
handler.handler.runEthPeer(p, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
}(localEth)

time.Sleep(200 * time.Millisecond)
go func(p *bsc.Peer) {
(*bscHandler)(handler.handler).RunPeer(p, func(peer *bsc.Peer) error {
return bsc.Handle((*bscHandler)(handler.handler), peer)
})
}(localBsc)

// Run the handshake locally to avoid spinning up a source handler
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
t.Fatalf("failed to run eth protocol handshake: %d", err)
}
if err := remoteBsc.Handshake(); err != nil {
t.Fatalf("failed to run bsc protocol handshake: %d", err)
}

// After the handshake completes, the source handler should stream the sink
// the votes, subscribe to all inbound network events
backend := new(testBscHandler)
Expand Down Expand Up @@ -221,31 +221,30 @@ func testRecvVotes(t *testing.T, protocol uint) {
defer localBsc.Close()
defer remoteBsc.Close()

go func(p *bsc.Peer) {
(*bscHandler)(handler.handler).RunPeer(p, func(peer *bsc.Peer) error {
return bsc.Handle((*bscHandler)(handler.handler), peer)
})
}(localBsc)

time.Sleep(200 * time.Millisecond)
remoteBsc.Handshake()

time.Sleep(200 * time.Millisecond)
go func(p *eth.Peer) {
handler.handler.runEthPeer(p, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(handler.handler), peer)
})
}(localEth)

time.Sleep(200 * time.Millisecond)
go func(p *bsc.Peer) {
(*bscHandler)(handler.handler).RunPeer(p, func(peer *bsc.Peer) error {
return bsc.Handle((*bscHandler)(handler.handler), peer)
})
}(localBsc)

// Run the handshake locally to avoid spinning up a source handler
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
)
time.Sleep(200 * time.Millisecond)
if err := remoteEth.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake: %d", err)
t.Fatalf("failed to run eth protocol handshake: %d", err)
}
if err := remoteBsc.Handshake(); err != nil {
t.Fatalf("failed to run bsc protocol handshake: %d", err)
}

votesCh := make(chan core.NewVoteEvent)
Expand Down
7 changes: 6 additions & 1 deletion eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,12 @@ func (ps *peerSet) registerPeer(peer *eth.Peer, ext *snap.Peer, diffExt *diff.Pe
eth.trustExt = &trustPeer{trustExt}
}
if bscExt != nil {
eth.bscExt = &bscPeer{bscExt}
if err := bscExt.Handshake(); err == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this seem against the design of geth, as if every protocol follow the same pattern, the handshake of the different protocol will happen serially.

Copy link
Contributor Author

@NathanBSC NathanBSC Apr 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the design of geth has no handshake exclude eth protocol,
waitBscExtension and registerBscExtension can end quickly
when handshake of sub protocol involved in above two , it' begin to slow down and lead to problem

eth.bscExt = &bscPeer{bscExt}
} else {
peer.Log().Debug("bsc protocol handshake failed", "err", err)
return err
}
}
ps.peers[id] = eth
return nil
Expand Down
17 changes: 17 additions & 0 deletions eth/protocols/bsc/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bsc

import (
"errors"
"fmt"
"time"

Expand All @@ -11,6 +12,17 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
)

const (
// handshakeWaitTimeout is the maximum allowed time for the extension waiting to
// complete handshake before dropping the connection as malicious.
handshakeWaitTimeout = 20 * time.Second
)

var (
// errHandshakeWaitTimeout is returned if a peer waits handshake for too long
errHandshakeWaitTimeout = errors.New("peer handshake wait timeout")
)

// Handler is a callback to invoke from an outside runner after the boilerplate
// exchanges have passed.
type Handler func(peer *Peer) error
Expand Down Expand Up @@ -74,6 +86,11 @@ func MakeProtocols(backend Backend, dnsdisc enode.Iterator) []p2p.Protocol {
// Handle is the callback invoked to manage the life cycle of a `bsc` peer.
// When this function terminates, the peer is disconnected.
func Handle(backend Backend, peer *Peer) error {
select {
case <-peer.handshaked:
Copy link
Collaborator

@unclezoro unclezoro Apr 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not extensible, as the channel is only used by bsc protocol, it wont work if another extension is needed in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better snap and bsc share the same pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snap has no handshake, it's easy.
all problem begin from a sub protocol need handshake

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not extensible, as the channel is only used by bsc protocol, it wont work if another extension is needed in the future.

another sub protocol need handshake should have it's own handshaked channel

case <-time.After(handshakeWaitTimeout):
return errHandshakeWaitTimeout
}
for {
if err := handleMessage(backend, peer); err != nil {
peer.Log().Debug("Message handling failed in `bsc`", "err", err)
Expand Down
1 change: 1 addition & 0 deletions eth/protocols/bsc/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (p *Peer) Handshake() error {
return p2p.DiscReadTimeout
}
}
p.handshaked <- struct{}{}
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions eth/protocols/bsc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Peer struct {
version uint // Protocol version negotiated
logger log.Logger // Contextual logger with the peer id injected
term chan struct{} // Termination channel to stop the broadcasters

handshaked chan struct{} // channel to start handleMessage after handshake
}

// NewPeer create a wrapper for a network connection and negotiated protocol
Expand All @@ -49,6 +51,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
version: version,
logger: log.New("peer", id[:8]),
term: make(chan struct{}),
handshaked: make(chan struct{}, 1), //asynchronous for handshake only once
}
go peer.broadcastVotes()
return peer
Expand Down