Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix want gauge calculation #416

Merged
merged 3 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 88 additions & 63 deletions internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,28 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Clean up want-blocks from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)

// Decrement the gauges by the number of pending want-blocks to the peer
if removedLastPeer {
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingBlock == 0 {
pwm.wantBlockGauge.Dec()
if !pwm.broadcastWants.Has(c) {
if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
}

return nil
})

// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Clean up want-haves from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)

// Decrement the gauge by the number of pending want-haves to the peer
if removedLastPeer && !pwm.broadcastWants.Has(c) {
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingBlock == 0 && peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
return nil
Expand All @@ -122,8 +125,9 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)

// Increment the total wants gauge
// If no peer has a pending want for the key
if _, ok := pwm.wantPeers[c]; !ok {
// Increment the total wants gauge
pwm.wantGauge.Inc()
}
}
Expand Down Expand Up @@ -168,27 +172,30 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
if !pws.wantBlocks.Has(c) {
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)

// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)

// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
if pws.wantBlocks.Has(c) {
continue
}

// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)

// Increment the want gauges
if isNew {
pwm.wantBlockGauge.Inc()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
// Increment the want gauges
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingBlock == 0 {
pwm.wantBlockGauge.Inc()
if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}

// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)

// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)

// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)

// Update the reverse index
pwm.reverseIndexAdd(c, p)
}

// Iterate over the requested want-haves
Expand All @@ -201,19 +208,20 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves

// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Increment the total wants gauge
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) && peersWantingBlock == 0 {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
pwm.wantGauge.Inc()
}

// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)

// Add the CID to the results
fltWantHvs = append(fltWantHvs, c)

// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)

// Increment the total wants gauge
if isNew && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
pwm.reverseIndexAdd(c, p)
}
}

Expand All @@ -228,6 +236,14 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
return
}

// Record how many peers have a pending want-block and want-have for each
// key to be cancelled
peersWantingBefore := make(map[cid.Cid][]int, len(cancelKs))
for _, c := range cancelKs {
blks, haves := pwm.peersWanting(c)
peersWantingBefore[c] = []int{blks, haves}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}

// Create a buffer to use for filtering cancels per peer, with the
// broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
Expand All @@ -238,9 +254,6 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}

cancelledWantBlocks := cid.NewSet()
cancelledWantHaves := cid.NewSet()

// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
Expand All @@ -249,15 +262,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// For each key to be cancelled
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
wantHave := pws.wantHaves.Has(c)

// Update the want gauges
if wantBlock {
cancelledWantBlocks.Add(c)
} else if wantHave {
cancelledWantHaves.Add(c)
} else {
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
continue
}

Expand Down Expand Up @@ -304,33 +309,56 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}

// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
// Decrement the wants gauges
for _, c := range cancelKs {
before := peersWantingBefore[c]
peersWantingBlockBefore := before[0]
peersWantingHaveBefore := before[1]

// Decrement the total wants gauge for broadcast wants
if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
// If there were any peers that had a pending want-block for the key
if peersWantingBlockBefore > 0 {
// Decrement the want-block gauge
pwm.wantBlockGauge.Dec()
}

// If there was a peer that had a pending want or it was a broadcast want
if peersWantingBlockBefore > 0 || peersWantingHaveBefore > 0 || pwm.broadcastWants.Has(c) {
// Decrement the total wants gauge
pwm.wantGauge.Dec()
}
}

// Decrement the total wants gauge for peer wants
_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
return nil
})
_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
pwm.wantBlockGauge.Dec()
return nil
})
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
}

// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
// Batch-remove the reverse-index. There's no need to clear this index
// peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}

// peersWanting counts how many peers have a pending want-block and want-have
// for the given CID
func (pwm *peerWantManager) peersWanting(c cid.Cid) (int, int) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
blockCount := 0
haveCount := 0
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
continue
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

if pws.wantBlocks.Has(c) {
blockCount++
} else if pws.wantHaves.Has(c) {
haveCount++
}
}

return blockCount, haveCount
}

// Add the peer to the list of peers that have sent a want with the cid
Expand All @@ -345,16 +373,13 @@ func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
}

// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
return true
}
}

return false
}

// GetWantBlocks returns the set of all want-blocks sent to all peers
Expand Down
78 changes: 78 additions & 0 deletions internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,81 @@ func TestStats(t *testing.T) {
t.Fatal("Expected 0 want-blocks")
}
}

func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)

peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)

pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)

// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)

// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)

if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})

if g.count != 2 {
t.Fatal("Expected 2 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}

func TestStatsRemovePeerOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)

peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)

pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)

// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)

// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)

if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}

// Remove p0
pwm.removePeer(p0)

if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}