Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/peerset): remove race conditions from peerset package #2267

Merged
merged 58 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
6a7d933
chore: adding read/write mutexes to peerstate
EclesioMeloJunior Feb 3, 2022
b0eb77f
chore: remove channels and use the executor as parameter
EclesioMeloJunior Feb 3, 2022
98cb4bc
chore: remove unneeded stop and ctx
EclesioMeloJunior Feb 3, 2022
a6e97fd
chore: remove unused cancel ctx
EclesioMeloJunior Feb 3, 2022
d8f6700
chore: keep the same log informations
EclesioMeloJunior Feb 4, 2022
a93fafe
chore: resolve lint
EclesioMeloJunior Feb 4, 2022
858a72b
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Feb 7, 2022
55dca9c
chore: define struct inside function
EclesioMeloJunior Feb 7, 2022
f0c0e10
chore: use sync.Mutex composition
EclesioMeloJunior Feb 7, 2022
1f39a56
chore: adding license
EclesioMeloJunior Feb 7, 2022
5366804
chore: keep `Reputation` at `peerset`
EclesioMeloJunior Feb 7, 2022
3695853
chore: replace fun signature to interface implementation
EclesioMeloJunior Mar 4, 2022
36ab22e
chore: move comment
EclesioMeloJunior Mar 4, 2022
fc0cf71
chore: improve test description
EclesioMeloJunior Mar 4, 2022
147575c
chore: improve test exepect function calls
EclesioMeloJunior Mar 4, 2022
8e693b5
chore: add comment to network.Process exported function
EclesioMeloJunior Mar 8, 2022
a340adc
chore: fix TestPeerSetDiscovered unit test
EclesioMeloJunior Mar 11, 2022
a1d5391
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 11, 2022
39e1974
chore: fix typo at comment
EclesioMeloJunior Mar 11, 2022
3773559
chore: wrap errors and add String to ReputationChange
EclesioMeloJunior Mar 11, 2022
71c5ea4
Merge branch 'eclesio/refactor-peerstate' of github.com:ChainSafe/gos…
EclesioMeloJunior Mar 11, 2022
78a5468
chore: remove unneeded comment
EclesioMeloJunior Mar 11, 2022
50ef6cb
chore: implement MessageProcessor interface at network/host
EclesioMeloJunior Mar 14, 2022
4f285ee
chore: rename `doWork` to `periodicallyAllocateSlots`
EclesioMeloJunior Mar 14, 2022
237cfe9
chore: set processor at testing
EclesioMeloJunior Mar 14, 2022
d02fafc
chore: address comments
EclesioMeloJunior Mar 14, 2022
a4adf71
chore: address comments
EclesioMeloJunior Mar 15, 2022
1b49a64
chore: wrap `ErrPeerDoesNotExist`
EclesioMeloJunior Mar 15, 2022
9d4c81a
chore: addressing comments
EclesioMeloJunior Mar 15, 2022
af4380e
chore: fix PeerStateHandler interface
EclesioMeloJunior Mar 16, 2022
c184ccf
chore: improve `Test_Ban_Reject_Accept_Peer` assertions
EclesioMeloJunior Mar 16, 2022
5fe401c
chore: improve the peerset test assertions
EclesioMeloJunior Mar 17, 2022
ca7f676
Update dot/network/state.go
EclesioMeloJunior Mar 17, 2022
60791f1
chore: fix lll lint issue
EclesioMeloJunior Mar 17, 2022
4856eb2
Update dot/peerset/handler.go
EclesioMeloJunior Mar 17, 2022
ad060fe
chore: implementing the reject case in the incoming peer connections
EclesioMeloJunior Mar 17, 2022
2c870d2
chore: fix race conditions in test assertion
EclesioMeloJunior Mar 18, 2022
249eecf
Merge branch 'eclesio/refactor-peerstate' of github.com:ChainSafe/gos…
EclesioMeloJunior Mar 18, 2022
9dcb348
chore: exec `go fmt ./...`
EclesioMeloJunior Mar 18, 2022
c726949
chore: add `//nolint:unparam`
EclesioMeloJunior Mar 18, 2022
9946ead
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 21, 2022
aab7e3a
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Mar 30, 2022
7d2f7c7
chore: use channels with protected maps
EclesioMeloJunior Mar 30, 2022
266d7a2
chore: remove nolintlint warns
EclesioMeloJunior Mar 30, 2022
8a52a8a
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior Apr 25, 2022
0dbad30
chore: fixing the newtork service Stop method
EclesioMeloJunior Apr 25, 2022
2207aad
chore: reintroduce the `msgChanSize` comment
EclesioMeloJunior Apr 25, 2022
04221fa
chore: close the `resultMsgCh` once ctx ends
EclesioMeloJunior Apr 25, 2022
0dffe8a
chore: use variables on eclesio/refactor-peerstate
EclesioMeloJunior Apr 25, 2022
77784e2
chore: adding comments
EclesioMeloJunior Apr 25, 2022
d563386
chore: logging ctx error as debug
EclesioMeloJunior Apr 25, 2022
df15a5d
chore: fixing the tests helpers
EclesioMeloJunior Apr 25, 2022
a3b6ea8
Merge branch 'development' into eclesio/refactor-peerstate
EclesioMeloJunior May 5, 2022
3480177
chore: address comment, asserting log format types
EclesioMeloJunior May 5, 2022
233d2eb
chore: removing diffs
EclesioMeloJunior May 5, 2022
9315906
chore: removing diffs, fix logger format
EclesioMeloJunior May 5, 2022
cb763ab
chore: remove unneeded time.Sleep
EclesioMeloJunior May 5, 2022
53987c1
chore: address error wraping
EclesioMeloJunior May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func NewService(cfg *Config) (*Service, error) {
if cfg.batchSize == 0 {
cfg.batchSize = defaultTxnBatchSize
}

// create a new host instance
host, err := newHost(ctx, cfg)
if err != nil {
Expand Down Expand Up @@ -507,8 +508,6 @@ func (s *Service) Stop() error {
logger.Errorf("Failed to close host: %s", err)
}

s.host.cm.peerSetHandler.Stop()

// check if closeCh is closed, if not, close it.
mainloop:
for {
Expand Down Expand Up @@ -695,13 +694,11 @@ func (s *Service) ReportPeer(change peerset.ReputationChange, p peer.ID) {
}

func (s *Service) startPeerSetHandler() {
s.host.cm.peerSetHandler.Start(s.ctx)
s.host.cm.peerSetHandler.Start(s.ctx, s.processMessage)
// wait for peerSetHandler to start.
if !s.noBootstrap {
s.host.bootstrap()
}

go s.startProcessingMsg()
}

func (s *Service) processMessage(msg peerset.Message) {
Expand Down Expand Up @@ -737,19 +734,3 @@ func (s *Service) processMessage(msg peerset.Message) {
logger.Debugf("connection dropped successfully for peer %s", peerID)
}
}

func (s *Service) startProcessingMsg() {
msgCh := s.host.cm.peerSetHandler.Messages()
for {
select {
case <-s.ctx.Done():
return
case msg, ok := <-msgCh:
if !ok {
return
}

s.processMessage(msg)
}
}
}
5 changes: 1 addition & 4 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ type TransactionHandler interface {

// PeerSetHandler is the interface used by the connection manager to handle peerset.
type PeerSetHandler interface {
Start(context.Context)
Stop()
Start(context.Context, func(peerset.Message))
ReportPeer(peerset.ReputationChange, ...peer.ID)
PeerAdd
PeerRemove
Expand All @@ -73,6 +72,4 @@ type PeerRemove interface {
// Peer is the interface used by the PeerSetHandler to get the peer data from peerSet.
type Peer interface {
PeerReputation(peer.ID) (peerset.Reputation, error)
SortedPeers(idx int) chan peer.IDSlice
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
Messages() chan peerset.Message
}
124 changes: 55 additions & 69 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ package peerset

import (
"context"
"fmt"
"strings"

"github.com/libp2p/go-libp2p-core/peer"
)

const logStringPattern = "call=%s, set-id=%d, reputation change %v, peers=[%s]"

// Handler manages peerSet.
type Handler struct {
actionQueue chan<- action
peerSet *PeerSet
closeCh chan struct{}

cancelCtx context.CancelFunc
peerSet *PeerSet
}

// NewPeerSetHandler creates a new *peerset.Handler.
Expand All @@ -30,80 +30,82 @@ func NewPeerSetHandler(cfg *ConfigSet) (*Handler, error) {
}, nil
}

// SetReservedOnlyPeer not yet implemented
func (h *Handler) SetReservedOnlyPeer(setID int, peers ...peer.ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like these functions should return an error now, and the caller should log it.

// TODO: not yet implemented (#1888)
logger.Errorf("failed to do action %s on peerSet: not implemented yet", setReservedOnly)
}

// AddReservedPeer adds reserved peer into peerSet.
func (h *Handler) AddReservedPeer(setID int, peers ...peer.ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want any of these functions returning errors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm curious here as well 🤔 Same wondering question for the below methods logging errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these methods were intended to be non-blocking, so the caller executes a handler method that will dispatch a message through a channel, and if an error occurs during the message it just logs. Now I have removed the channel and as all those methods was not expected to return any error I keep the log

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should return an error and the caller should log the error.

h.actionQueue <- action{
actionCall: addReservedPeer,
setID: setID,
peers: peers,
err := h.peerSet.addReservedPeers(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, addReservedPeer, setID, nil, stringfyPeers(peers))
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// RemoveReservedPeer remove reserved peer from peerSet.
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
func (h *Handler) RemoveReservedPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: removeReservedPeer,
setID: setID,
peers: peers,
err := h.peerSet.removeReservedPeers(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, removeReservedPeer, setID, nil, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// SetReservedPeer set the reserve peer into peerSet
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
func (h *Handler) SetReservedPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: setReservedPeers,
setID: setID,
peers: peers,
// TODO: this is not used yet, might required to implement RPC Call for this.
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
err := h.peerSet.setReservedPeer(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, setReservedPeers, setID, nil, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// AddPeer adds peer to peerSet.
func (h *Handler) AddPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: addToPeerSet,
setID: setID,
peers: peers,
err := h.peerSet.addPeer(setID, peers)
if err != nil {
msg := fmt.Sprintf(logStringPattern, addToPeerSet, setID, nil, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// RemovePeer removes peer from peerSet.
func (h *Handler) RemovePeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: removeFromPeerSet,
setID: setID,
peers: peers,
err := h.peerSet.removePeer(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, removeFromPeerSet, setID, nil, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// ReportPeer reports ReputationChange according to the peer behaviour.
func (h *Handler) ReportPeer(rep ReputationChange, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: reportPeer,
reputation: rep,
peers: peers,
err := h.peerSet.reportPeer(rep, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, reportPeer, 0, rep, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// Incoming calls when we have an incoming connection from peer.
func (h *Handler) Incoming(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: incoming,
peers: peers,
setID: setID,
err := h.peerSet.incoming(setID, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, incoming, setID, nil, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

// Messages return result message chan.
func (h *Handler) Messages() chan Message {
return h.peerSet.resultMsgCh
}

// DisconnectPeer calls for disconnecting a connection from peer.
func (h *Handler) DisconnectPeer(setID int, peers ...peer.ID) {
h.actionQueue <- action{
actionCall: disconnect,
setID: setID,
peers: peers,
err := h.peerSet.disconnect(setID, UnknownDrop, peers...)
if err != nil {
msg := fmt.Sprintf(logStringPattern, disconnect, setID, nil, stringfyPeers(peers))
logger.Errorf("failed to do action %s on peerSet: %s", msg, err)
}
}

Expand All @@ -113,40 +115,24 @@ func (h *Handler) PeerReputation(peerID peer.ID) (Reputation, error) {
if err != nil {
return 0, err
}
return n.getReputation(), nil
return n.rep, nil
}

// Start starts peerSet processing
func (h *Handler) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
h.cancelCtx = cancel

actionCh := make(chan action, msgChanSize)
h.closeCh = make(chan struct{})
h.actionQueue = actionCh

h.peerSet.start(ctx, actionCh)
func (h *Handler) Start(ctx context.Context, processMessageFn func(Message)) {
h.peerSet.start(ctx, processMessageFn)
}

// SortedPeers return chan for sorted connected peer in the peerSet.
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
func (h *Handler) SortedPeers(setIdx int) chan peer.IDSlice {
resultPeersCh := make(chan peer.IDSlice)
h.actionQueue <- action{
actionCall: sortedPeers,
resultPeersCh: resultPeersCh,
setID: setIdx,
}

return resultPeersCh
func (h *Handler) SortedPeers(setIdx int) peer.IDSlice {
return h.peerSet.peerState.sortedPeers(setIdx)
}

// Stop closes the actionQueue and result message chan.
func (h *Handler) Stop() {
select {
case <-h.closeCh:
default:
h.cancelCtx()
close(h.closeCh)
close(h.actionQueue)
func stringfyPeers(peers peer.IDSlice) string {
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
peersStrings := make([]string, len(peers))
for i := range peers {
peersStrings[i] = peers[i].String()
}

return strings.Join(peersStrings, ", ")
}
Loading