From 32d0c188e6c3fc003001db41ae4ae59d9c99bb89 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 19 Feb 2019 14:57:34 -0800 Subject: [PATCH] fix(wantlist): remove races on setup fix race conditions while setting up wantlists by creating peer queues on demand BREAKING CHANGE: PeerManager SendMessage signature changed fix #51 --- bitswap.go | 6 +- peermanager/peermanager.go | 154 ++++++++------------------------ peermanager/peermanager_test.go | 14 ++- wantmanager/wantmanager.go | 48 ++++++++-- wantmanager/wantmanager_test.go | 32 ++++--- 5 files changed, 103 insertions(+), 151 deletions(-) diff --git a/bitswap.go b/bitswap.go index 97e1daa1..3abbc197 100644 --- a/bitswap.go +++ b/bitswap.go @@ -132,7 +132,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, } bs.wm.SetDelegate(bs.pm) - bs.pm.Startup() bs.wm.Startup() bs.pqm.Startup() network.SetDelegate(bs) @@ -361,14 +360,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { // Connected/Disconnected warns bitswap about peer connections. func (bs *Bitswap) PeerConnected(p peer.ID) { - initialWants := bs.wm.CurrentBroadcastWants() - bs.pm.Connected(p, initialWants) + bs.wm.Connected(p) bs.engine.PeerConnected(p) } // Connected/Disconnected warns bitswap about peer connections. func (bs *Bitswap) PeerDisconnected(p peer.ID) { - bs.pm.Disconnected(p) + bs.wm.Disconnected(p) bs.engine.PeerDisconnected(p) } diff --git a/peermanager/peermanager.go b/peermanager/peermanager.go index fed1b3f7..ca7665cf 100644 --- a/peermanager/peermanager.go +++ b/peermanager/peermanager.go @@ -2,6 +2,7 @@ package peermanager import ( "context" + "sync" bsmsg "github.com/ipfs/go-bitswap/message" wantlist "github.com/ipfs/go-bitswap/wantlist" @@ -34,150 +35,56 @@ type peerMessage interface { // PeerManager manages a pool of peers and sends messages to peers in the pool. type PeerManager struct { - // sync channel for Run loop - peerMessages chan peerMessage - - // synchronized by Run loop, only touch inside there - peerQueues map[peer.ID]PeerQueue - + peerQueues map[peer.ID]PeerQueue + lk sync.RWMutex createPeerQueue PeerQueueFactory ctx context.Context - cancel func() } // New creates a new PeerManager, given a context and a peerQueueFactory. func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { - ctx, cancel := context.WithCancel(ctx) return &PeerManager{ - peerMessages: make(chan peerMessage, 10), peerQueues: make(map[peer.ID]PeerQueue), createPeerQueue: createPeerQueue, ctx: ctx, - cancel: cancel, } } // ConnectedPeers returns a list of peers this PeerManager is managing. func (pm *PeerManager) ConnectedPeers() []peer.ID { - resp := make(chan []peer.ID, 1) - select { - case pm.peerMessages <- &getPeersMessage{resp}: - case <-pm.ctx.Done(): - return nil - } - select { - case peers := <-resp: - return peers - case <-pm.ctx.Done(): - return nil - } -} - -// Connected is called to add a new peer to the pool, and send it an initial set -// of wants. -func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { - select { - case pm.peerMessages <- &connectPeerMessage{p, initialEntries}: - case <-pm.ctx.Done(): - } -} - -// Disconnected is called to remove a peer from the pool. -func (pm *PeerManager) Disconnected(p peer.ID) { - select { - case pm.peerMessages <- &disconnectPeerMessage{p}: - case <-pm.ctx.Done(): - } -} - -// SendMessage is called to send a message to all or some peers in the pool; -// if targets is nil, it sends to all. -func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { - select { - case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}: - case <-pm.ctx.Done(): - } -} - -// Startup enables the run loop for the PeerManager - no processing will occur -// if startup is not called. -func (pm *PeerManager) Startup() { - go pm.run() -} - -// Shutdown shutsdown processing for the PeerManager. -func (pm *PeerManager) Shutdown() { - pm.cancel() -} - -func (pm *PeerManager) run() { - for { - select { - case message := <-pm.peerMessages: - message.handle(pm) - case <-pm.ctx.Done(): - return - } - } -} + pm.lk.RLock() + defer pm.lk.RUnlock() -type sendPeerMessage struct { - entries []*bsmsg.Entry - targets []peer.ID - from uint64 -} - -func (s *sendPeerMessage) handle(pm *PeerManager) { - pm.sendMessage(s) -} - -type connectPeerMessage struct { - p peer.ID - initialEntries []*wantlist.Entry -} - -func (c *connectPeerMessage) handle(pm *PeerManager) { - pm.startPeerHandler(c.p, c.initialEntries) -} - -type disconnectPeerMessage struct { - p peer.ID -} - -func (dc *disconnectPeerMessage) handle(pm *PeerManager) { - pm.stopPeerHandler(dc.p) -} - -type getPeersMessage struct { - peerResp chan<- []peer.ID -} - -func (gp *getPeersMessage) handle(pm *PeerManager) { - pm.getPeers(gp.peerResp) -} - -func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) { peers := make([]peer.ID, 0, len(pm.peerQueues)) for p := range pm.peerQueues { peers = append(peers, p) } - peerResp <- peers + + return peers } -func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue { +// Connected is called to add a new peer to the pool, and send it an initial set +// of wants. +func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) { + pm.lk.Lock() + defer pm.lk.Unlock() + mq, ok := pm.peerQueues[p] if ok { mq.RefIncrement() - return nil + return } mq = pm.createPeerQueue(p) pm.peerQueues[p] = mq mq.Startup(pm.ctx, initialEntries) - return mq } -func (pm *PeerManager) stopPeerHandler(p peer.ID) { +// Disconnected is called to remove a peer from the pool. +func (pm *PeerManager) Disconnected(p peer.ID) { + pm.lk.Lock() + defer pm.lk.Unlock() + pq, ok := pm.peerQueues[p] if !ok { // TODO: log error? @@ -192,19 +99,28 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) { delete(pm.peerQueues, p) } -func (pm *PeerManager) sendMessage(ms *sendPeerMessage) { - if len(ms.targets) == 0 { +// SendMessage is called to send a message to all or some peers in the pool; +// if targets is nil, it sends to all. +func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) { + pm.lk.Lock() + defer pm.lk.Unlock() + + if len(targets) == 0 { for _, p := range pm.peerQueues { - p.AddMessage(ms.entries, ms.from) + p.AddMessage(entries, from) } } else { - for _, t := range ms.targets { + for _, t := range targets { p, ok := pm.peerQueues[t] if !ok { - log.Infof("tried sending wantlist change to non-partner peer: %s", t) - continue + p = pm.createPeerQueue(t) + pm.peerQueues[t] = p + p.Startup(pm.ctx, initialEntries) + // this is a "0 reference" queue because we haven't actually connected to it + // sending the first message will cause it to connect + p.RefDecrement() } - p.AddMessage(ms.entries, ms.from) + p.AddMessage(entries, from) } } } diff --git a/peermanager/peermanager_test.go b/peermanager/peermanager_test.go index 9617dad3..fa9d7940 100644 --- a/peermanager/peermanager_test.go +++ b/peermanager/peermanager_test.go @@ -79,7 +79,6 @@ func TestAddingAndRemovingPeers(t *testing.T) { tp := testutil.GeneratePeers(5) peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] peerManager := New(ctx, peerQueueFactory) - peerManager.Startup() peerManager.Connected(peer1, nil) peerManager.Connected(peer2, nil) @@ -118,14 +117,13 @@ func TestAddingAndRemovingPeers(t *testing.T) { func TestSendingMessagesToPeers(t *testing.T) { ctx := context.Background() - messagesSent := make(chan messageSent) + messagesSent := make(chan messageSent, 16) peerQueueFactory := makePeerQueueFactory(messagesSent) tp := testutil.GeneratePeers(5) peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] peerManager := New(ctx, peerQueueFactory) - peerManager.Startup() peerManager.Connected(peer1, nil) peerManager.Connected(peer2, nil) @@ -134,7 +132,7 @@ func TestSendingMessagesToPeers(t *testing.T) { entries := testutil.GenerateMessageEntries(5, false) ses := testutil.GenerateSessionID() - peerManager.SendMessage(entries, nil, ses) + peerManager.SendMessage(nil, entries, nil, ses) peersReceived := collectAndCheckMessages( ctx, t, messagesSent, entries, ses, 10*time.Millisecond) @@ -155,11 +153,11 @@ func TestSendingMessagesToPeers(t *testing.T) { var peersToSendTo []peer.ID peersToSendTo = append(peersToSendTo, peer1, peer3, peer4) - peerManager.SendMessage(entries, peersToSendTo, ses) + peerManager.SendMessage(nil, entries, peersToSendTo, ses) peersReceived = collectAndCheckMessages( ctx, t, messagesSent, entries, ses, 10*time.Millisecond) - if len(peersReceived) != 2 { + if len(peersReceived) != 3 { t.Fatal("Incorrect number of peers received messages") } @@ -173,7 +171,7 @@ func TestSendingMessagesToPeers(t *testing.T) { t.Fatal("Peers received message but should not have") } - if testutil.ContainsPeer(peersReceived, peer4) { - t.Fatal("Peers targeted received message but was not connected") + if !testutil.ContainsPeer(peersReceived, peer4) { + t.Fatal("Peer should have autoconnected on message send") } } diff --git a/wantmanager/wantmanager.go b/wantmanager/wantmanager.go index 3e5a6c9a..8b248059 100644 --- a/wantmanager/wantmanager.go +++ b/wantmanager/wantmanager.go @@ -20,10 +20,12 @@ const ( maxPriority = math.MaxInt32 ) -// WantSender sends changes out to the network as they get added to the wantlist +// PeerHandler sends changes out to the network as they get added to the wantlist // managed by the WantManager. -type WantSender interface { - SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) +type PeerHandler interface { + Disconnected(p peer.ID) + Connected(p peer.ID, initialEntries []*wantlist.Entry) + SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) } type wantMessage interface { @@ -46,7 +48,7 @@ type WantManager struct { ctx context.Context cancel func() - wantSender WantSender + peerHandler PeerHandler wantlistGauge metrics.Gauge } @@ -66,8 +68,8 @@ func New(ctx context.Context) *WantManager { } // SetDelegate specifies who will send want changes out to the internet. -func (wm *WantManager) SetDelegate(wantSender WantSender) { - wm.wantSender = wantSender +func (wm *WantManager) SetDelegate(peerHandler PeerHandler) { + wm.peerHandler = peerHandler } // WantBlocks adds the given cids to the wantlist, tracked by the given session. @@ -145,6 +147,22 @@ func (wm *WantManager) WantCount() int { } } +// Connected is called when a new peer is connected +func (wm *WantManager) Connected(p peer.ID) { + select { + case wm.wantMessages <- &connectedMessage{p}: + case <-wm.ctx.Done(): + } +} + +// Disconnected is called when a peer is disconnected +func (wm *WantManager) Disconnected(p peer.ID) { + select { + case wm.wantMessages <- &disconnectedMessage{p}: + case <-wm.ctx.Done(): + } +} + // Startup starts processing for the WantManager. func (wm *WantManager) Startup() { go wm.run() @@ -214,7 +232,7 @@ func (ws *wantSet) handle(wm *WantManager) { } // broadcast those wantlist changes - wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from) + wm.peerHandler.SendMessage(wm.bcwl.Entries(), ws.entries, ws.targets, ws.from) } type isWantedMessage struct { @@ -250,3 +268,19 @@ type wantCountMessage struct { func (wcm *wantCountMessage) handle(wm *WantManager) { wcm.resp <- wm.wl.Len() } + +type connectedMessage struct { + p peer.ID +} + +func (cm *connectedMessage) handle(wm *WantManager) { + wm.peerHandler.Connected(cm.p, wm.bcwl.Entries()) +} + +type disconnectedMessage struct { + p peer.ID +} + +func (dm *disconnectedMessage) handle(wm *WantManager) { + wm.peerHandler.Disconnected(dm.p) +} diff --git a/wantmanager/wantmanager_test.go b/wantmanager/wantmanager_test.go index 85590bb1..37a1d276 100644 --- a/wantmanager/wantmanager_test.go +++ b/wantmanager/wantmanager_test.go @@ -7,35 +7,41 @@ import ( "testing" "github.com/ipfs/go-bitswap/testutil" + wantlist "github.com/ipfs/go-bitswap/wantlist" bsmsg "github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-peer" ) -type fakeWantSender struct { - lk sync.RWMutex - lastWantSet wantSet +type fakePeerHandler struct { + lk sync.RWMutex + lastWantSet wantSet + initialEntries []*wantlist.Entry } -func (fws *fakeWantSender) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) { - fws.lk.Lock() - fws.lastWantSet = wantSet{entries, targets, from} - fws.lk.Unlock() +func (fph *fakePeerHandler) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) { + fph.lk.Lock() + fph.lastWantSet = wantSet{entries, targets, from} + fph.initialEntries = initialEntries + fph.lk.Unlock() } -func (fws *fakeWantSender) getLastWantSet() wantSet { - fws.lk.Lock() - defer fws.lk.Unlock() - return fws.lastWantSet +func (fph *fakePeerHandler) Connected(p peer.ID, initialEntries []*wantlist.Entry) {} +func (fph *fakePeerHandler) Disconnected(p peer.ID) {} + +func (fph *fakePeerHandler) getLastWantSet() wantSet { + fph.lk.Lock() + defer fph.lk.Unlock() + return fph.lastWantSet } func setupTestFixturesAndInitialWantList() ( - context.Context, *fakeWantSender, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) { + context.Context, *fakePeerHandler, *WantManager, []cid.Cid, []cid.Cid, []peer.ID, uint64, uint64) { ctx := context.Background() // setup fixtures - wantSender := &fakeWantSender{} + wantSender := &fakePeerHandler{} wantManager := New(ctx) keys := testutil.GenerateCids(10) otherKeys := testutil.GenerateCids(5)