Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/peerset): remove race conditions from peerset package #2267

Merged
merged 58 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
6a7d933
chore: adding read/write mutexes to peerstate
EclesioMeloJunior Feb 3, 2022
b0eb77f
chore: remove channels and use the executor as parameter
EclesioMeloJunior Feb 3, 2022
98cb4bc
chore: remove unneeded stop and ctx
EclesioMeloJunior Feb 3, 2022
a6e97fd
chore: remove unused cancel ctx
EclesioMeloJunior Feb 3, 2022
d8f6700
chore: keep the same log informations
EclesioMeloJunior Feb 4, 2022
a93fafe
chore: resolve lint
EclesioMeloJunior Feb 4, 2022
858a72b
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Feb 7, 2022
55dca9c
chore: define struct inside function
EclesioMeloJunior Feb 7, 2022
f0c0e10
chore: use sync.Mutex composition
EclesioMeloJunior Feb 7, 2022
1f39a56
chore: adding license
EclesioMeloJunior Feb 7, 2022
5366804
chore: keep `Reputation` at `peerset`
EclesioMeloJunior Feb 7, 2022
3695853
chore: replace fun signature to interface implementation
EclesioMeloJunior Mar 4, 2022
36ab22e
chore: move comment
EclesioMeloJunior Mar 4, 2022
fc0cf71
chore: improve test description
EclesioMeloJunior Mar 4, 2022
147575c
chore: improve test exepect function calls
EclesioMeloJunior Mar 4, 2022
8e693b5
chore: add comment to network.Process exported function
EclesioMeloJunior Mar 8, 2022
a340adc
chore: fix TestPeerSetDiscovered unit test
EclesioMeloJunior Mar 11, 2022
a1d5391
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 11, 2022
39e1974
chore: fix typo at comment
EclesioMeloJunior Mar 11, 2022
3773559
chore: wrap errors and add String to ReputationChange
EclesioMeloJunior Mar 11, 2022
71c5ea4
Merge branch 'eclesio/refactor-peerstate' of github.com:ChainSafe/gos…
EclesioMeloJunior Mar 11, 2022
78a5468
chore: remove unneeded comment
EclesioMeloJunior Mar 11, 2022
50ef6cb
chore: implement MessageProcessor interface at network/host
EclesioMeloJunior Mar 14, 2022
4f285ee
chore: rename `doWork` to `periodicallyAllocateSlots`
EclesioMeloJunior Mar 14, 2022
237cfe9
chore: set processor at testing
EclesioMeloJunior Mar 14, 2022
d02fafc
chore: address comments
EclesioMeloJunior Mar 14, 2022
a4adf71
chore: address comments
EclesioMeloJunior Mar 15, 2022
1b49a64
chore: wrap `ErrPeerDoesNotExist`
EclesioMeloJunior Mar 15, 2022
9d4c81a
chore: addressing comments
EclesioMeloJunior Mar 15, 2022
af4380e
chore: fix PeerStateHandler interface
EclesioMeloJunior Mar 16, 2022
c184ccf
chore: improve `Test_Ban_Reject_Accept_Peer` assertions
EclesioMeloJunior Mar 16, 2022
5fe401c
chore: improve the peerset test assertions
EclesioMeloJunior Mar 17, 2022
ca7f676
Update dot/network/state.go
EclesioMeloJunior Mar 17, 2022
60791f1
chore: fix lll lint issue
EclesioMeloJunior Mar 17, 2022
4856eb2
Update dot/peerset/handler.go
EclesioMeloJunior Mar 17, 2022
ad060fe
chore: implementing the reject case in the incoming peer connections
EclesioMeloJunior Mar 17, 2022
2c870d2
chore: fix race conditions in test assertion
EclesioMeloJunior Mar 18, 2022
249eecf
Merge branch 'eclesio/refactor-peerstate' of github.com:ChainSafe/gos…
EclesioMeloJunior Mar 18, 2022
9dcb348
chore: exec `go fmt ./...`
EclesioMeloJunior Mar 18, 2022
c726949
chore: add `//nolint:unparam`
EclesioMeloJunior Mar 18, 2022
9946ead
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 21, 2022
aab7e3a
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 30, 2022
7d2f7c7
chore: use channels with protected maps
EclesioMeloJunior Mar 30, 2022
266d7a2
chore: remove nolintlint warns
EclesioMeloJunior Mar 30, 2022
8a52a8a
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Apr 25, 2022
0dbad30
chore: fixing the newtork service Stop method
EclesioMeloJunior Apr 25, 2022
2207aad
chore: reintroduce the `msgChanSize` comment
EclesioMeloJunior Apr 25, 2022
04221fa
chore: close the `resultMsgCh` once ctx ends
EclesioMeloJunior Apr 25, 2022
0dffe8a
chore: use variables on eclesio/refactor-peerstate
EclesioMeloJunior Apr 25, 2022
77784e2
chore: adding comments
EclesioMeloJunior Apr 25, 2022
d563386
chore: logging ctx error as debug
EclesioMeloJunior Apr 25, 2022
df15a5d
chore: fixing the tests helpers
EclesioMeloJunior Apr 25, 2022
a3b6ea8
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior May 5, 2022
3480177
chore: address comment, asserting log format types
EclesioMeloJunior May 5, 2022
233d2eb
chore: removing diffs
EclesioMeloJunior May 5, 2022
9315906
chore: removing diffs, fix logger format
EclesioMeloJunior May 5, 2022
cb763ab
chore: remove unneeded time.Sleep
EclesioMeloJunior May 5, 2022
53987c1
chore: address error wraping
EclesioMeloJunior May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
}

cm.host = host
cm.peerSetHandler.SetMessageProcessor(host)
return host, nil
}

Expand Down Expand Up @@ -437,3 +438,38 @@ func (h *host) closeProtocolStream(pID protocol.ID, p peer.ID) {
}
}
}

// Process will connect, drop or reject a peer based on a peerset message
func (h *host) Process(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
logger.Errorf("found empty peer id in peerset message")
return
}
switch msg.Status {
case peerset.Connect:
addrInfo := h.h.Peerstore().PeerInfo(peerID)
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
if len(addrInfo.Addrs) == 0 {
var err error
addrInfo, err = h.discovery.findPeer(peerID)
if err != nil {
logger.Warnf("failed to find peer id %s: %s", peerID, err)
return
}
}

err := h.connect(addrInfo)
if err != nil {
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
case peerset.Drop, peerset.Reject:
err := h.closePeer(peerID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does closePeer () handle the case where a peer isn't connected (imagining thats the Reject case)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now Reject case is processed but not the right way I think. The network.handleConn(conn libp2pnetwork.Conn) which started the incoming calls the peersetHandler.Incoming method which should check if the peer should be accepted or rejected but this answer is sent to the processor instead of returned to the network.handleConn to stop the handshake with the peer.

I think we should change the following method:

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
	// TODO: currently we only have one set so setID is 0, 
        // change this once we have more set in peerSet.
	s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer())

to

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
	// TODO: currently we only have one set so setID is 0, 
        // change this once we have more set in peerSet.
	result := s.host.cm.peerSetHandler.Incoming(0, conn.RemotePeer())
        switch result {
        case Reject: 
            logger.Infof("reject ...")
            return
        }

        // exchange BlockAnnounceHandshake with peer so we can start to
	// sync if necessary.
	prtl, has := s.notificationsProtocols[BlockAnnounceMsgType]
	if !has {
		return
	}

	hs, err := prtl.getHandshake()
        ...

@noot, @timwu20 what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EclesioMeloJunior that makes sense, also in the Reject case you would need to call conn.Close() to close the connection with the peer

if err != nil {
logger.Warnf("failed to close connection with peer %s: %s", peerID, err)
return
}
logger.Debugf("connection dropped successfully for peer %s", peerID)
}
}
58 changes: 2 additions & 56 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ type Service struct {

blockResponseBuf []byte
blockResponseBufMu sync.Mutex

telemetry telemetry.Client
telemetry telemetry.Client
}

// NewService creates a new network service from the configuration and message channels
Expand Down Expand Up @@ -174,6 +173,7 @@ func NewService(cfg *Config) (*Service, error) {
if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}

// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -487,8 +487,6 @@ func (s *Service) Stop() error {
logger.Errorf("Failed to close host: %s", err)
}

s.host.cm.peerSetHandler.Stop()

// check if closeCh is closed, if not, close it.
mainloop:
for {
Expand Down Expand Up @@ -680,56 +678,4 @@ func (s *Service) startPeerSetHandler() {
if !s.noBootstrap {
s.host.bootstrap()
}

go s.startProcessingMsg()
}

func (s *Service) processMessage(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
logger.Errorf("found empty peer id in peerset message")
return
}
switch msg.Status {
case peerset.Connect:
addrInfo := s.host.h.Peerstore().PeerInfo(peerID)
if len(addrInfo.Addrs) == 0 {
var err error
addrInfo, err = s.host.discovery.findPeer(peerID)
if err != nil {
logger.Warnf("failed to find peer id %s: %s", peerID, err)
return
}
}

err := s.host.connect(addrInfo)
if err != nil {
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
case peerset.Drop, peerset.Reject:
err := s.host.closePeer(peerID)
if err != nil {
logger.Warnf("failed to close connection with peer %s: %s", peerID, err)
return
}
logger.Debugf("connection dropped successfully for peer %s", peerID)
}
}

func (s *Service) startProcessingMsg() {
msgCh := s.host.cm.peerSetHandler.Messages()
for {
select {
case <-s.ctx.Done():
return
case msg, ok := <-msgCh:
if !ok {
return
}

s.processMessage(msg)
}
}
}
4 changes: 1 addition & 3 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type TransactionHandler interface {

// PeerSetHandler is the interface used by the connection manager to handle peerset.
type PeerSetHandler interface {
SetMessageProcessor(peerset.MessageProcessor)
Start(context.Context)
Stop()
ReportPeer(peerset.ReputationChange, ...peer.ID)
PeerAdd
PeerRemove
Expand All @@ -72,6 +72,4 @@ type PeerRemove interface {
// Peer is the interface used by the PeerSetHandler to get the peer data from peerSet.
type Peer interface {
PeerReputation(peer.ID) (peerset.Reputation, error)
SortedPeers(idx int) chan peer.IDSlice
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
Messages() chan peerset.Message
}
133 changes: 62 additions & 71 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ package peerset

import (
"context"
"fmt"
"strings"

"github.com/libp2p/go-libp2p-core/peer"
)

const logStringPattern = "call=%s, set-id=%d, reputation change %s, peers=[%s]"
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved

// Handler manages peerSet.
type Handler struct {
actionQueue chan<- action
peerSet *PeerSet
closeCh chan struct{}

cancelCtx context.CancelFunc
peerSet *PeerSet
}

// NewPeerSetHandler creates a new *peerset.Handler.
Expand All @@ -30,80 +30,82 @@ func NewPeerSetHandler(cfg *ConfigSet) (*Handler, error) {
}, nil
}

// SetReservedOnlyPeer not yet implemented
func (h *Handler) SetReservedOnlyPeer(setID int, peers ...peer.ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like these functions should return an error now, and the caller should log it.

// TODO: not yet implemented (#1888)
logger.Errorf("failed to do action %s on peerSet: not implemented yet", setReservedOnly)
}

// AddReservedPeer adds reserved peer into peerSet.
func (h *Handler) AddReservedPeer(setID int, peers ...peer.ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want any of these functions returning errors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm curious here as well 🤔 Same wondering question for the below methods logging errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these methods were intended to be non-blocking, so the caller executes a handler method that will dispatch a message through a channel, and if an error occurs during the message it just logs. Now I have removed the channel and as all those methods was not expected to return any error I keep the log

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should return an error and the caller should log the error.

h.actionQueue <- action{
actionCall: addReservedPeer,
setID: setID,
peers: peers,
err := h.peerSet.addReservedPeers(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, addReservedPeer, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

// RemoveReservedPeer remove reserved peer from peerSet.
// RemoveReservedPeer removes reserved peer from peerSet.
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
func (h *Handler) RemoveReservedPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: removeReservedPeer,
setID: setID,
peers: peers,
err := h.peerSet.removeReservedPeers(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, removeReservedPeer, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// SetReservedPeer set the reserve peer into peerSet
// SetReservedPeer sets the reserve peer into peerSet
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
func (h *Handler) SetReservedPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: setReservedPeers,
setID: setID,
peers: peers,
// TODO: this is not used yet, it might be required to implement an RPC Call for this.
err := h.peerSet.setReservedPeer(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, setReservedPeers, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// AddPeer adds peer to peerSet.
func (h *Handler) AddPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: addToPeerSet,
setID: setID,
peers: peers,
err := h.peerSet.addPeer(setID, peers)
if err != nil {
msg := fmt.Sprintf(logStringPattern, addToPeerSet, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// RemovePeer removes peer from peerSet.
func (h *Handler) RemovePeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: removeFromPeerSet,
setID: setID,
peers: peers,
err := h.peerSet.removePeer(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, removeFromPeerSet, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// ReportPeer reports ReputationChange according to the peer behaviour.
func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: reportPeer,
reputation: rep,
peers: peers,
err := h.peerSet.reportPeer(rep, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, reportPeer, 0, rep.String(), stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// Incoming calls when we have an incoming connection from peer.
func (h *Handler) Incoming(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: incoming,
peers: peers,
setID: setID,
err := h.peerSet.incoming(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, incoming, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// Messages return result message chan.
func (h *Handler) Messages() chan Message {
return h.peerSet.resultMsgCh
}

// DisconnectPeer calls for disconnecting a connection from peer.
func (h *Handler) DisconnectPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: disconnect,
setID: setID,
peers: peers,
err := h.peerSet.disconnect(setID, UnknownDrop, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, disconnect, setID, "", stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

Expand All @@ -113,40 +115,29 @@ func (h *Handler) PeerReputation(peerID peer.ID) (Reputation, error) {
if err != nil {
return 0, err
}
return n.getReputation(), nil
return n.reputation, nil
}

// SetMessageProcessor set a processor to process peerset messages
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
func (h *Handler) SetMessageProcessor(processor MessageProcessor) {
h.peerSet.processor = processor
}

// Start starts peerSet processing
func (h *Handler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
h.cancelCtx = cancel

actionCh := make(chan action, msgChanSize)
h.closeCh = make(chan struct{})
h.actionQueue = actionCh

h.peerSet.start(ctx, actionCh)
h.peerSet.start(ctx)
}

// SortedPeers return chan for sorted connected peer in the peerSet.
func (h *Handler) SortedPeers(setIdx int) chan peer.IDSlice {
resultPeersCh := make(chan peer.IDSlice)
h.actionQueue <- action{
actionCall: sortedPeers,
resultPeersCh: resultPeersCh,
setID: setIdx,
}

return resultPeersCh
// SortedPeers returns a sorted peer ID slice for connected peers in the peerSet.
func (h *Handler) SortedPeers(setIdx int) peer.IDSlice {
return h.peerSet.peerState.sortedPeers(setIdx)
}

// Stop closes the actionQueue and result message chan.
func (h *Handler) Stop() {
select {
case <-h.closeCh:
default:
h.cancelCtx()
close(h.closeCh)
close(h.actionQueue)
func stringfyPeers(peers peer.IDSlice) string {
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
peersStrings := make([]string, len(peers))
for i := range peers {
peersStrings[i] = peers[i].String()
}

return strings.Join(peersStrings, ", ")
}
46 changes: 46 additions & 0 deletions dot/peerset/mock_message_processor_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading