Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix(wantlist): remove races on setup
Browse files Browse the repository at this point in the history
fix race conditions while setting up wantlists by creating peer queues on demand

BREAKING CHANGE: PeerManager SendMessage signature changed

fix #51
  • Loading branch information
hannahhoward committed Feb 20, 2019
1 parent 722239f commit 32d0c18
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 151 deletions.
6 changes: 2 additions & 4 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs.wm.SetDelegate(bs.pm)
bs.pm.Startup()
bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)
Expand Down Expand Up @@ -361,14 +360,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) {

// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerConnected(p peer.ID) {
initialWants := bs.wm.CurrentBroadcastWants()
bs.pm.Connected(p, initialWants)
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}

// Connected/Disconnected warns bitswap about peer connections.
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.pm.Disconnected(p)
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
154 changes: 35 additions & 119 deletions peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peermanager

import (
"context"
"sync"

bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
Expand Down Expand Up @@ -34,150 +35,56 @@ type peerMessage interface {

// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// sync channel for Run loop
peerMessages chan peerMessage

// synchronized by Run loop, only touch inside there
peerQueues map[peer.ID]PeerQueue

peerQueues map[peer.ID]PeerQueue
lk sync.RWMutex
createPeerQueue PeerQueueFactory
ctx context.Context
cancel func()
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
ctx, cancel := context.WithCancel(ctx)
return &PeerManager{
peerMessages: make(chan peerMessage, 10),
peerQueues: make(map[peer.ID]PeerQueue),
createPeerQueue: createPeerQueue,
ctx: ctx,
cancel: cancel,
}
}

// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID, 1)
select {
case pm.peerMessages <- &getPeersMessage{resp}:
case <-pm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-pm.ctx.Done():
return nil
}
}

// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
select {
case pm.peerMessages <- &connectPeerMessage{p, initialEntries}:
case <-pm.ctx.Done():
}
}

// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
select {
case pm.peerMessages <- &disconnectPeerMessage{p}:
case <-pm.ctx.Done():
}
}

// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
select {
case pm.peerMessages <- &sendPeerMessage{entries: entries, targets: targets, from: from}:
case <-pm.ctx.Done():
}
}

// Startup enables the run loop for the PeerManager - no processing will occur
// if startup is not called.
func (pm *PeerManager) Startup() {
go pm.run()
}

// Shutdown shutsdown processing for the PeerManager.
func (pm *PeerManager) Shutdown() {
pm.cancel()
}

func (pm *PeerManager) run() {
for {
select {
case message := <-pm.peerMessages:
message.handle(pm)
case <-pm.ctx.Done():
return
}
}
}
pm.lk.RLock()
defer pm.lk.RUnlock()

type sendPeerMessage struct {
entries []*bsmsg.Entry
targets []peer.ID
from uint64
}

func (s *sendPeerMessage) handle(pm *PeerManager) {
pm.sendMessage(s)
}

type connectPeerMessage struct {
p peer.ID
initialEntries []*wantlist.Entry
}

func (c *connectPeerMessage) handle(pm *PeerManager) {
pm.startPeerHandler(c.p, c.initialEntries)
}

type disconnectPeerMessage struct {
p peer.ID
}

func (dc *disconnectPeerMessage) handle(pm *PeerManager) {
pm.stopPeerHandler(dc.p)
}

type getPeersMessage struct {
peerResp chan<- []peer.ID
}

func (gp *getPeersMessage) handle(pm *PeerManager) {
pm.getPeers(gp.peerResp)
}

func (pm *PeerManager) getPeers(peerResp chan<- []peer.ID) {
peers := make([]peer.ID, 0, len(pm.peerQueues))
for p := range pm.peerQueues {
peers = append(peers, p)
}
peerResp <- peers

return peers
}

func (pm *PeerManager) startPeerHandler(p peer.ID, initialEntries []*wantlist.Entry) PeerQueue {
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
pm.lk.Lock()
defer pm.lk.Unlock()

mq, ok := pm.peerQueues[p]
if ok {
mq.RefIncrement()
return nil
return
}

mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries)
return mq
}

func (pm *PeerManager) stopPeerHandler(p peer.ID) {
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.lk.Lock()
defer pm.lk.Unlock()

pq, ok := pm.peerQueues[p]
if !ok {
// TODO: log error?
Expand All @@ -192,19 +99,28 @@ func (pm *PeerManager) stopPeerHandler(p peer.ID) {
delete(pm.peerQueues, p)
}

func (pm *PeerManager) sendMessage(ms *sendPeerMessage) {
if len(ms.targets) == 0 {
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
pm.lk.Lock()
defer pm.lk.Unlock()

if len(targets) == 0 {
for _, p := range pm.peerQueues {
p.AddMessage(ms.entries, ms.from)
p.AddMessage(entries, from)
}
} else {
for _, t := range ms.targets {
for _, t := range targets {
p, ok := pm.peerQueues[t]
if !ok {
log.Infof("tried sending wantlist change to non-partner peer: %s", t)
continue
p = pm.createPeerQueue(t)
pm.peerQueues[t] = p
p.Startup(pm.ctx, initialEntries)
// this is a "0 reference" queue because we haven't actually connected to it
// sending the first message will cause it to connect
p.RefDecrement()
}
p.AddMessage(ms.entries, ms.from)
p.AddMessage(entries, from)
}
}
}
14 changes: 6 additions & 8 deletions peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func TestAddingAndRemovingPeers(t *testing.T) {
tp := testutil.GeneratePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Startup()

peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
Expand Down Expand Up @@ -118,14 +117,13 @@ func TestAddingAndRemovingPeers(t *testing.T) {

func TestSendingMessagesToPeers(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan messageSent)
messagesSent := make(chan messageSent, 16)
peerQueueFactory := makePeerQueueFactory(messagesSent)

tp := testutil.GeneratePeers(5)

peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager.Startup()

peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
Expand All @@ -134,7 +132,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
entries := testutil.GenerateMessageEntries(5, false)
ses := testutil.GenerateSessionID()

peerManager.SendMessage(entries, nil, ses)
peerManager.SendMessage(nil, entries, nil, ses)

peersReceived := collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
Expand All @@ -155,11 +153,11 @@ func TestSendingMessagesToPeers(t *testing.T) {

var peersToSendTo []peer.ID
peersToSendTo = append(peersToSendTo, peer1, peer3, peer4)
peerManager.SendMessage(entries, peersToSendTo, ses)
peerManager.SendMessage(nil, entries, peersToSendTo, ses)
peersReceived = collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)

if len(peersReceived) != 2 {
if len(peersReceived) != 3 {
t.Fatal("Incorrect number of peers received messages")
}

Expand All @@ -173,7 +171,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
t.Fatal("Peers received message but should not have")
}

if testutil.ContainsPeer(peersReceived, peer4) {
t.Fatal("Peers targeted received message but was not connected")
if !testutil.ContainsPeer(peersReceived, peer4) {
t.Fatal("Peer should have autoconnected on message send")
}
}
48 changes: 41 additions & 7 deletions wantmanager/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ const (
maxPriority = math.MaxInt32
)

// WantSender sends changes out to the network as they get added to the wantlist
// PeerHandler sends changes out to the network as they get added to the wantlist
// managed by the WantManager.
type WantSender interface {
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
type PeerHandler interface {
Disconnected(p peer.ID)
Connected(p peer.ID, initialEntries []*wantlist.Entry)
SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64)
}

type wantMessage interface {
Expand All @@ -46,7 +48,7 @@ type WantManager struct {
ctx context.Context
cancel func()

wantSender WantSender
peerHandler PeerHandler
wantlistGauge metrics.Gauge
}

Expand All @@ -66,8 +68,8 @@ func New(ctx context.Context) *WantManager {
}

// SetDelegate specifies who will send want changes out to the internet.
func (wm *WantManager) SetDelegate(wantSender WantSender) {
wm.wantSender = wantSender
func (wm *WantManager) SetDelegate(peerHandler PeerHandler) {
wm.peerHandler = peerHandler
}

// WantBlocks adds the given cids to the wantlist, tracked by the given session.
Expand Down Expand Up @@ -145,6 +147,22 @@ func (wm *WantManager) WantCount() int {
}
}

// Connected is called when a new peer is connected
func (wm *WantManager) Connected(p peer.ID) {
select {
case wm.wantMessages <- &connectedMessage{p}:
case <-wm.ctx.Done():
}
}

// Disconnected is called when a peer is disconnected
func (wm *WantManager) Disconnected(p peer.ID) {
select {
case wm.wantMessages <- &disconnectedMessage{p}:
case <-wm.ctx.Done():
}
}

// Startup starts processing for the WantManager.
func (wm *WantManager) Startup() {
go wm.run()
Expand Down Expand Up @@ -214,7 +232,7 @@ func (ws *wantSet) handle(wm *WantManager) {
}

// broadcast those wantlist changes
wm.wantSender.SendMessage(ws.entries, ws.targets, ws.from)
wm.peerHandler.SendMessage(wm.bcwl.Entries(), ws.entries, ws.targets, ws.from)
}

type isWantedMessage struct {
Expand Down Expand Up @@ -250,3 +268,19 @@ type wantCountMessage struct {
func (wcm *wantCountMessage) handle(wm *WantManager) {
wcm.resp <- wm.wl.Len()
}

type connectedMessage struct {
p peer.ID
}

func (cm *connectedMessage) handle(wm *WantManager) {
wm.peerHandler.Connected(cm.p, wm.bcwl.Entries())
}

type disconnectedMessage struct {
p peer.ID
}

func (dm *disconnectedMessage) handle(wm *WantManager) {
wm.peerHandler.Disconnected(dm.p)
}
Loading

0 comments on commit 32d0c18

Please sign in to comment.