Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#405 from ipfs/fix/register-session
Browse files Browse the repository at this point in the history
Ensure sessions register with PeerManager

This commit was moved from ipfs/go-bitswap@1910e21
  • Loading branch information
Stebalien authored Jun 3, 2020
2 parents 55ed620 + 8ebd663 commit 6f58a27
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
6 changes: 5 additions & 1 deletion bitswap/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (sws *sessionWantSender) onChange(changes []change) {
// If the update includes blocks or haves, treat it as signaling that
// the peer is available
if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 {
availability[chng.update.from] = true
p := chng.update.from
availability[p] = true

// Register with the PeerManager
sws.pm.RegisterSession(p, sws)
}

updates = append(updates, chng.update)
Expand Down
50 changes: 50 additions & 0 deletions bitswap/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool {
return true
}

func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool {
pm.lk.Lock()
defer pm.lk.Unlock()

if session, ok := pm.peerSessions[p]; ok {
return session.ID() == sid
}
return false
}

func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}
Expand Down Expand Up @@ -324,6 +334,46 @@ func TestCancelWants(t *testing.T) {
}
}

func TestRegisterSessionWithPeerManager(t *testing.T) {
cids := testutil.GenerateCids(2)
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
swc := newMockSessionMgr()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted)
defer spm.Shutdown()

go spm.Run()

// peerA: HAVE cid0
spm.Update(peerA, nil, cids[:1], nil)

// Wait for processing to complete
time.Sleep(10 * time.Millisecond)

// Expect session to have been registered with PeerManager
if !pm.has(peerA, sid) {
t.Fatal("Expected HAVE to register session with PeerManager")
}

// peerB: block cid1
spm.Update(peerB, cids[1:], nil, nil)

// Wait for processing to complete
time.Sleep(10 * time.Millisecond)

// Expect session to have been registered with PeerManager
if !pm.has(peerB, sid) {
t.Fatal("Expected HAVE to register session with PeerManager")
}
}

func TestPeerUnavailable(t *testing.T) {
cids := testutil.GenerateCids(2)
peers := testutil.GeneratePeers(2)
Expand Down

0 comments on commit 6f58a27

Please sign in to comment.