Skip to content

Commit

Permalink
Total wants gauge (ipfs#402)
Browse files Browse the repository at this point in the history
* feat: total wants gauge

* fix: in gauges count wants regardless of which peers they're sent to

* fix: want block gauge calculation

* refactor: simplify peermanagerwants

This commit was moved from ipfs/go-bitswap@88373cd
  • Loading branch information
dirkmc authored Jun 2, 2020
1 parent 45841da commit 55ed620
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 35 deletions.
3 changes: 2 additions & 1 deletion bitswap/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type PeerManager struct {
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
createPeerQueue: createPeerQueue,
ctx: ctx,
self: self,
Expand Down
99 changes: 78 additions & 21 deletions bitswap/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type peerWantManager struct {
// broadcastWants tracks all the current broadcast wants.
broadcastWants *cid.Set

// Keeps track of the number of active want-haves & want-blocks
wantGauge Gauge
// Keeps track of the number of active want-blocks
wantBlockGauge Gauge
}
Expand All @@ -42,11 +44,12 @@ type peerWant struct {

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager {
return &peerWantManager{
broadcastWants: cid.NewSet(),
peerWants: make(map[peer.ID]*peerWant),
wantPeers: make(map[cid.Cid]map[peer.ID]struct{}),
wantGauge: wantGauge,
wantBlockGauge: wantBlockGauge,
}
}
Expand Down Expand Up @@ -78,17 +81,30 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
return
}

// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Decrement the gauge by the number of pending want-blocks to the peer
pwm.wantBlockGauge.Dec()
// Clean up want-blocks from the reverse index
pwm.reverseIndexRemove(c, p)
removedLastPeer := pwm.reverseIndexRemove(c, p)

// Decrement the gauges by the number of pending want-blocks to the peer
if removedLastPeer {
pwm.wantBlockGauge.Dec()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
}
return nil
})

// Clean up want-haves from the reverse index
// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
pwm.reverseIndexRemove(c, p)
// Clean up want-haves from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)

// Decrement the gauge by the number of pending want-haves to the peer
if removedLastPeer && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
return nil
})

Expand All @@ -105,6 +121,11 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
}
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)

// Increment the total wants gauge
if _, ok := pwm.wantPeers[c]; !ok {
pwm.wantGauge.Inc()
}
}

if len(unsent) == 0 {
Expand Down Expand Up @@ -151,17 +172,22 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)

// Update the reverse index
pwm.reverseIndexAdd(c, p)

// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)

// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)

// Increment the count of want-blocks
pwm.wantBlockGauge.Inc()
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)

// Increment the want gauges
if isNew {
pwm.wantBlockGauge.Inc()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
}
}

Expand All @@ -178,11 +204,16 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)

// Update the reverse index
pwm.reverseIndexAdd(c, p)

// Add the CID to the results
fltWantHvs = append(fltWantHvs, c)

// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)

// Increment the total wants gauge
if isNew && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
}

Expand All @@ -207,6 +238,9 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}

cancelledWantBlocks := cid.NewSet()
cancelledWantHaves := cid.NewSet()

// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
Expand All @@ -216,13 +250,15 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
if !wantBlock && !pws.wantHaves.Has(c) {
continue
}
wantHave := pws.wantHaves.Has(c)

// Update the want gauge.
// Update the want gauges
if wantBlock {
pwm.wantBlockGauge.Dec()
cancelledWantBlocks.Add(c)
} else if wantHave {
cancelledWantHaves.Add(c)
} else {
continue
}

// Unconditionally remove from the want lists.
Expand Down Expand Up @@ -271,33 +307,54 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)

// Decrement the total wants gauge for broadcast wants
if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
pwm.wantGauge.Dec()
}
}

// Decrement the total wants gauge for peer wants
_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
return nil
})
_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
pwm.wantBlockGauge.Dec()
return nil
})

// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}

}

// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
peers, ok := pwm.wantPeers[c]
if !ok {
peers = make(map[peer.ID]struct{}, 10)
pwm.wantPeers[c] = peers
}
peers[p] = struct{}{}
return !ok
}

// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
return true
}
}

return false
}

// GetWantBlocks returns the set of all want-blocks sent to all peers
Expand Down
82 changes: 69 additions & 13 deletions bitswap/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func clearSent(pqs map[peer.ID]PeerQueue) {
}

func TestEmpty(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})

if len(pwm.getWantBlocks()) > 0 {
t.Fatal("Expected GetWantBlocks() to have length 0")
Expand All @@ -67,7 +67,7 @@ func TestEmpty(t *testing.T) {
}

func TestPWMBroadcastWantHaves(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})

peers := testutil.GeneratePeers(3)
cids := testutil.GenerateCids(2)
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestPWMBroadcastWantHaves(t *testing.T) {
}

func TestPWMSendWants(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})

peers := testutil.GeneratePeers(2)
p0 := peers[0]
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestPWMSendWants(t *testing.T) {
}

func TestPWMSendCancels(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})

peers := testutil.GeneratePeers(2)
p0 := peers[0]
Expand Down Expand Up @@ -338,10 +338,12 @@ func TestPWMSendCancels(t *testing.T) {

func TestStats(t *testing.T) {
g := &gauge{}
pwm := newPeerWantManager(g)
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)

peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)

Expand All @@ -353,30 +355,84 @@ func TestStats(t *testing.T) {
// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)

if g.count != 2 {
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}

// Send 1 old want-block and 2 new want-blocks to p0
cids3 := testutil.GenerateCids(2)
pwm.sendWants(p0, append(cids3, cids[0]), []cid.Cid{})

if g.count != 4 {
if g.count != 6 {
t.Fatal("Expected 6 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Broadcast 1 old want-have and 2 new want-haves
cids4 := testutil.GenerateCids(2)
pwm.broadcastWantHaves(append(cids4, cids2[0]))
if g.count != 8 {
t.Fatal("Expected 8 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Add a second peer
pwm.addPeer(pq, p1)

if g.count != 8 {
t.Fatal("Expected 8 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids4 := testutil.GenerateCids(1)
pwm.sendCancels(append(cids4, cids[0]))
cids5 := testutil.GenerateCids(1)
pwm.sendCancels(append(cids5, cids[0]))

if g.count != 3 {
t.Fatal("Expected 3 want-blocks", g.count)
if g.count != 7 {
t.Fatal("Expected 7 wants")
}
if wbg.count != 3 {
t.Fatal("Expected 3 want-blocks")
}

// Remove first peer
pwm.removePeer(p0)

if g.count != 0 {
t.Fatal("Expected all want-blocks to be removed with peer", g.count)
// Should still have 3 broadcast wants
if g.count != 3 {
t.Fatal("Expected 3 wants")
}
if wbg.count != 0 {
t.Fatal("Expected all want-blocks to be removed")
}

// Remove second peer
pwm.removePeer(p1)

// Should still have 3 broadcast wants
if g.count != 3 {
t.Fatal("Expected 3 wants")
}
if wbg.count != 0 {
t.Fatal("Expected 0 want-blocks")
}

// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1])
if g.count != 2 {
t.Fatal("Expected 2 wants")
}
if wbg.count != 0 {
t.Fatal("Expected 0 want-blocks")
}
}

0 comments on commit 55ed620

Please sign in to comment.