diff --git a/bitswap/internal/session/sessionwantsender.go b/bitswap/internal/session/sessionwantsender.go index 094d9096b..036a7e910 100644 --- a/bitswap/internal/session/sessionwantsender.go +++ b/bitswap/internal/session/sessionwantsender.go @@ -271,7 +271,11 @@ func (sws *sessionWantSender) onChange(changes []change) { // If the update includes blocks or haves, treat it as signaling that // the peer is available if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 { - availability[chng.update.from] = true + p := chng.update.from + availability[p] = true + + // Register with the PeerManager + sws.pm.RegisterSession(p, sws) } updates = append(updates, chng.update) diff --git a/bitswap/internal/session/sessionwantsender_test.go b/bitswap/internal/session/sessionwantsender_test.go index 6c3059c1f..a36eb432e 100644 --- a/bitswap/internal/session/sessionwantsender_test.go +++ b/bitswap/internal/session/sessionwantsender_test.go @@ -66,6 +66,16 @@ func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool { return true } +func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool { + pm.lk.Lock() + defer pm.lk.Unlock() + + if session, ok := pm.peerSessions[p]; ok { + return session.ID() == sid + } + return false +} + func (*mockPeerManager) UnregisterSession(uint64) {} func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} @@ -324,6 +334,46 @@ func TestCancelWants(t *testing.T) { } } +func TestRegisterSessionWithPeerManager(t *testing.T) { + cids := testutil.GenerateCids(2) + peers := testutil.GeneratePeers(2) + peerA := peers[0] + peerB := peers[1] + sid := uint64(1) + pm := newMockPeerManager() + fpm := newFakeSessionPeerManager() + swc := newMockSessionMgr() + bpm := bsbpm.New() + onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} + onPeersExhausted := func([]cid.Cid) {} + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + defer spm.Shutdown() + + go spm.Run() + + // peerA: HAVE cid0 + spm.Update(peerA, nil, cids[:1], nil) + + // Wait for processing to complete + time.Sleep(10 * time.Millisecond) + + // Expect session to have been registered with PeerManager + if !pm.has(peerA, sid) { + t.Fatal("Expected HAVE to register session with PeerManager") + } + + // peerB: block cid1 + spm.Update(peerB, cids[1:], nil, nil) + + // Wait for processing to complete + time.Sleep(10 * time.Millisecond) + + // Expect session to have been registered with PeerManager + if !pm.has(peerB, sid) { + t.Fatal("Expected HAVE to register session with PeerManager") + } +} + func TestPeerUnavailable(t *testing.T) { cids := testutil.GenerateCids(2) peers := testutil.GeneratePeers(2)