From 7087c2d8ee15d400420c4a61196f7a6078f406a3 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 27 Apr 2020 16:25:50 -0400 Subject: [PATCH 1/2] refactor: customize SessionInterestManager --- .../sessioninterestmanager.go | 127 +++++++-- internal/sessionwantlist/sessionwantlist.go | 141 ---------- .../sessionwantlist/sessionwantlist_test.go | 258 ------------------ 3 files changed, 106 insertions(+), 420 deletions(-) delete mode 100644 internal/sessionwantlist/sessionwantlist.go delete mode 100644 internal/sessionwantlist/sessionwantlist_test.go diff --git a/internal/sessioninterestmanager/sessioninterestmanager.go b/internal/sessioninterestmanager/sessioninterestmanager.go index ff0d1230..afb62516 100644 --- a/internal/sessioninterestmanager/sessioninterestmanager.go +++ b/internal/sessioninterestmanager/sessioninterestmanager.go @@ -3,7 +3,6 @@ package sessioninterestmanager import ( "sync" - bsswl "github.com/ipfs/go-bitswap/internal/sessionwantlist" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" @@ -11,16 +10,22 @@ import ( // SessionInterestManager records the CIDs that each session is interested in. type SessionInterestManager struct { - lk sync.RWMutex - interested *bsswl.SessionWantlist - wanted *bsswl.SessionWantlist + lk sync.RWMutex + wants map[cid.Cid]map[uint64]bool } // New initializes a new SessionInterestManager. func New() *SessionInterestManager { return &SessionInterestManager{ - interested: bsswl.NewSessionWantlist(), - wanted: bsswl.NewSessionWantlist(), + // Map of cids -> sessions -> bool + // + // The boolean indicates whether the session still wants the block + // or is just interested in receiving messages about it. + // + // Note that once the block is received the session no longer wants + // the block, but still wants to receive messages from peers who have + // the block as they may have other blocks the session is interested in. + wants: make(map[cid.Cid]map[uint64]bool), } } @@ -30,8 +35,15 @@ func (sim *SessionInterestManager) RecordSessionInterest(ses uint64, ks []cid.Ci sim.lk.Lock() defer sim.lk.Unlock() - sim.interested.Add(ks, ses) - sim.wanted.Add(ks, ses) + // For each key + for _, c := range ks { + // If there is no list of sessions for the key, create one + if _, ok := sim.wants[c]; !ok { + sim.wants[c] = make(map[uint64]bool) + } + // Record that the session wants the block + sim.wants[c][ses] = true + } } // When the session shuts down it calls RemoveSessionInterest(). @@ -40,26 +52,68 @@ func (sim *SessionInterestManager) RemoveSession(ses uint64) []cid.Cid { sim.lk.Lock() defer sim.lk.Unlock() - sim.wanted.RemoveSession(ses) - return sim.interested.RemoveSession(ses) + // The keys that no session is interested in + deletedKs := make([]cid.Cid, 0) + + // For each known key + for c := range sim.wants { + // Remove the session from the list of sessions that want the key + delete(sim.wants[c], ses) + + // If there are no more sessions that want the key + if len(sim.wants[c]) == 0 { + // Clean up the list memory + delete(sim.wants, c) + // Add the key to the list of keys that no session is interested in + deletedKs = append(deletedKs, c) + } + } + + return deletedKs } // When the session receives blocks, it calls RemoveSessionWants(). -func (sim *SessionInterestManager) RemoveSessionWants(ses uint64, wants []cid.Cid) { +func (sim *SessionInterestManager) RemoveSessionWants(ses uint64, ks []cid.Cid) { sim.lk.Lock() defer sim.lk.Unlock() - sim.wanted.RemoveSessionKeys(ses, wants) + // For each key + for _, c := range ks { + // If the session wanted the block + if wanted, ok := sim.wants[c][ses]; ok && wanted { + // Mark the block as unwanted + sim.wants[c][ses] = false + } + } } // When a request is cancelled, the session calls RemoveSessionInterested(). // Returns the keys that no session is interested in any more. -func (sim *SessionInterestManager) RemoveSessionInterested(ses uint64, wants []cid.Cid) []cid.Cid { +func (sim *SessionInterestManager) RemoveSessionInterested(ses uint64, ks []cid.Cid) []cid.Cid { sim.lk.Lock() defer sim.lk.Unlock() - sim.wanted.RemoveSessionKeys(ses, wants) - return sim.interested.RemoveSessionKeys(ses, wants) + // The keys that no session is interested in + deletedKs := make([]cid.Cid, 0, len(ks)) + + // For each key + for _, c := range ks { + // If there is a list of sessions that want the key + if _, ok := sim.wants[c]; ok { + // Remove the session from the list of sessions that want the key + delete(sim.wants[c], ses) + + // If there are no more sessions that want the key + if len(sim.wants[c]) == 0 { + // Clean up the list memory + delete(sim.wants, c) + // Add the key to the list of keys that no session is interested in + deletedKs = append(deletedKs, c) + } + } + } + + return deletedKs } // The session calls FilterSessionInterested() to filter the sets of keys for @@ -68,9 +122,20 @@ func (sim *SessionInterestManager) FilterSessionInterested(ses uint64, ksets ... sim.lk.RLock() defer sim.lk.RUnlock() + // For each set of keys kres := make([][]cid.Cid, len(ksets)) for i, ks := range ksets { - kres[i] = sim.interested.SessionHas(ses, ks).Keys() + // The set of keys that at least one session is interested in + has := make([]cid.Cid, 0, len(ks)) + + // For each key in the list + for _, c := range ks { + // If there is a session that's interested, add the key to the set + if _, ok := sim.wants[c][ses]; ok { + has = append(has, c) + } + } + kres[i] = has } return kres } @@ -81,12 +146,19 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b sim.lk.RLock() defer sim.lk.RUnlock() - // Get the wanted block keys - ks := make([]cid.Cid, len(blks)) + // Get the wanted block keys as a set + wantedKs := cid.NewSet() for _, b := range blks { - ks = append(ks, b.Cid()) + c := b.Cid() + // For each session that is interested in the key + for ses := range sim.wants[c] { + // If the session wants the key (rather than just being interested) + if wanted, ok := sim.wants[c][ses]; ok && wanted { + // Add the key to the set + wantedKs.Add(c) + } + } } - wantedKs := sim.wanted.Has(ks) // Separate the blocks into wanted and unwanted wantedBlks := make([]blocks.Block, 0, len(blks)) @@ -112,5 +184,18 @@ func (sim *SessionInterestManager) InterestedSessions(blks []cid.Cid, haves []ci ks = append(ks, haves...) ks = append(ks, dontHaves...) - return sim.interested.SessionsFor(ks) + // Create a set of sessions that are interested in the keys + sesSet := make(map[uint64]struct{}) + for _, c := range ks { + for s := range sim.wants[c] { + sesSet[s] = struct{}{} + } + } + + // Convert the set into a list + ses := make([]uint64, 0, len(sesSet)) + for s := range sesSet { + ses = append(ses, s) + } + return ses } diff --git a/internal/sessionwantlist/sessionwantlist.go b/internal/sessionwantlist/sessionwantlist.go deleted file mode 100644 index 7651b7fe..00000000 --- a/internal/sessionwantlist/sessionwantlist.go +++ /dev/null @@ -1,141 +0,0 @@ -package sessionwantlist - -import ( - "sync" - - cid "github.com/ipfs/go-cid" -) - -// The SessionWantList keeps track of which sessions want a CID -type SessionWantlist struct { - sync.RWMutex - wants map[cid.Cid]map[uint64]struct{} -} - -func NewSessionWantlist() *SessionWantlist { - return &SessionWantlist{ - wants: make(map[cid.Cid]map[uint64]struct{}), - } -} - -// The given session wants the keys -func (swl *SessionWantlist) Add(ks []cid.Cid, ses uint64) { - swl.Lock() - defer swl.Unlock() - - for _, c := range ks { - if _, ok := swl.wants[c]; !ok { - swl.wants[c] = make(map[uint64]struct{}) - } - swl.wants[c][ses] = struct{}{} - } -} - -// Remove the keys for all sessions. -// Called when blocks are received. -func (swl *SessionWantlist) RemoveKeys(ks []cid.Cid) { - swl.Lock() - defer swl.Unlock() - - for _, c := range ks { - delete(swl.wants, c) - } -} - -// Remove the session's wants, and return wants that are no longer wanted by -// any session. -func (swl *SessionWantlist) RemoveSession(ses uint64) []cid.Cid { - swl.Lock() - defer swl.Unlock() - - deletedKs := make([]cid.Cid, 0) - for c := range swl.wants { - delete(swl.wants[c], ses) - if len(swl.wants[c]) == 0 { - delete(swl.wants, c) - deletedKs = append(deletedKs, c) - } - } - - return deletedKs -} - -// Remove the session's wants -func (swl *SessionWantlist) RemoveSessionKeys(ses uint64, ks []cid.Cid) []cid.Cid { - swl.Lock() - defer swl.Unlock() - - deletedKs := make([]cid.Cid, 0, len(ks)) - for _, c := range ks { - if _, ok := swl.wants[c]; ok { - delete(swl.wants[c], ses) - if len(swl.wants[c]) == 0 { - delete(swl.wants, c) - deletedKs = append(deletedKs, c) - } - } - } - - return deletedKs -} - -// All keys wanted by all sessions -func (swl *SessionWantlist) Keys() []cid.Cid { - swl.RLock() - defer swl.RUnlock() - - ks := make([]cid.Cid, 0, len(swl.wants)) - for c := range swl.wants { - ks = append(ks, c) - } - return ks -} - -// All sessions that want the given keys -func (swl *SessionWantlist) SessionsFor(ks []cid.Cid) []uint64 { - swl.RLock() - defer swl.RUnlock() - - sesMap := make(map[uint64]struct{}) - for _, c := range ks { - for s := range swl.wants[c] { - sesMap[s] = struct{}{} - } - } - - ses := make([]uint64, 0, len(sesMap)) - for s := range sesMap { - ses = append(ses, s) - } - return ses -} - -// Filter for keys that at least one session wants -func (swl *SessionWantlist) Has(ks []cid.Cid) *cid.Set { - swl.RLock() - defer swl.RUnlock() - - has := cid.NewSet() - for _, c := range ks { - if _, ok := swl.wants[c]; ok { - has.Add(c) - } - } - return has -} - -// Filter for keys that the given session wants -func (swl *SessionWantlist) SessionHas(ses uint64, ks []cid.Cid) *cid.Set { - swl.RLock() - defer swl.RUnlock() - - has := cid.NewSet() - for _, c := range ks { - if sesMap, cok := swl.wants[c]; cok { - if _, sok := sesMap[ses]; sok { - has.Add(c) - } - } - } - return has -} diff --git a/internal/sessionwantlist/sessionwantlist_test.go b/internal/sessionwantlist/sessionwantlist_test.go deleted file mode 100644 index d57f9395..00000000 --- a/internal/sessionwantlist/sessionwantlist_test.go +++ /dev/null @@ -1,258 +0,0 @@ -package sessionwantlist - -import ( - "os" - "testing" - - "github.com/ipfs/go-bitswap/internal/testutil" - - cid "github.com/ipfs/go-cid" -) - -var c0 cid.Cid -var c1 cid.Cid -var c2 cid.Cid - -const s0 = uint64(0) -const s1 = uint64(1) - -func setup() { - cids := testutil.GenerateCids(3) - c0 = cids[0] - c1 = cids[1] - c2 = cids[2] -} - -func TestMain(m *testing.M) { - setup() - os.Exit(m.Run()) -} - -func TestEmpty(t *testing.T) { - swl := NewSessionWantlist() - - if len(swl.Keys()) != 0 { - t.Fatal("Expected Keys() to be empty") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 0 { - t.Fatal("Expected SessionsFor() to be empty") - } -} - -func TestSimpleAdd(t *testing.T) { - swl := NewSessionWantlist() - - // s0: c0 - swl.Add([]cid.Cid{c0}, s0) - if len(swl.Keys()) != 1 { - t.Fatal("Expected Keys() to have length 1") - } - if !swl.Keys()[0].Equals(c0) { - t.Fatal("Expected Keys() to be [cid0]") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 1 { - t.Fatal("Expected SessionsFor() to have length 1") - } - if swl.SessionsFor([]cid.Cid{c0})[0] != s0 { - t.Fatal("Expected SessionsFor() to be [s0]") - } - - // s0: c0, c1 - swl.Add([]cid.Cid{c1}, s0) - if len(swl.Keys()) != 2 { - t.Fatal("Expected Keys() to have length 2") - } - if !testutil.MatchKeysIgnoreOrder(swl.Keys(), []cid.Cid{c0, c1}) { - t.Fatal("Expected Keys() to contain [cid0, cid1]") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 1 { - t.Fatal("Expected SessionsFor() to have length 1") - } - if swl.SessionsFor([]cid.Cid{c0})[0] != s0 { - t.Fatal("Expected SessionsFor() to be [s0]") - } - - // s0: c0, c1 - // s1: c0 - swl.Add([]cid.Cid{c0}, s1) - if len(swl.Keys()) != 2 { - t.Fatal("Expected Keys() to have length 2") - } - if !testutil.MatchKeysIgnoreOrder(swl.Keys(), []cid.Cid{c0, c1}) { - t.Fatal("Expected Keys() to contain [cid0, cid1]") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 2 { - t.Fatal("Expected SessionsFor() to have length 2") - } -} - -func TestMultiKeyAdd(t *testing.T) { - swl := NewSessionWantlist() - - // s0: c0, c1 - swl.Add([]cid.Cid{c0, c1}, s0) - if len(swl.Keys()) != 2 { - t.Fatal("Expected Keys() to have length 2") - } - if !testutil.MatchKeysIgnoreOrder(swl.Keys(), []cid.Cid{c0, c1}) { - t.Fatal("Expected Keys() to contain [cid0, cid1]") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 1 { - t.Fatal("Expected SessionsFor() to have length 1") - } - if swl.SessionsFor([]cid.Cid{c0})[0] != s0 { - t.Fatal("Expected SessionsFor() to be [s0]") - } -} - -func TestSessionHas(t *testing.T) { - swl := NewSessionWantlist() - - if swl.Has([]cid.Cid{c0, c1}).Len() > 0 { - t.Fatal("Expected Has([c0, c1]) to be []") - } - if swl.SessionHas(s0, []cid.Cid{c0, c1}).Len() > 0 { - t.Fatal("Expected SessionHas(s0, [c0, c1]) to be []") - } - - // s0: c0 - swl.Add([]cid.Cid{c0}, s0) - if !matchSet(swl.Has([]cid.Cid{c0, c1}), []cid.Cid{c0}) { - t.Fatal("Expected Has([c0, c1]) to be [c0]") - } - if !matchSet(swl.SessionHas(s0, []cid.Cid{c0, c1}), []cid.Cid{c0}) { - t.Fatal("Expected SessionHas(s0, [c0, c1]) to be [c0]") - } - if swl.SessionHas(s1, []cid.Cid{c0, c1}).Len() > 0 { - t.Fatal("Expected SessionHas(s1, [c0, c1]) to be []") - } - - // s0: c0, c1 - swl.Add([]cid.Cid{c1}, s0) - if !matchSet(swl.Has([]cid.Cid{c0, c1}), []cid.Cid{c0, c1}) { - t.Fatal("Expected Has([c0, c1]) to be [c0, c1]") - } - if !matchSet(swl.SessionHas(s0, []cid.Cid{c0, c1}), []cid.Cid{c0, c1}) { - t.Fatal("Expected SessionHas(s0, [c0, c1]) to be [c0, c1]") - } - - // s0: c0, c1 - // s1: c0 - swl.Add([]cid.Cid{c0}, s1) - if len(swl.Keys()) != 2 { - t.Fatal("Expected Keys() to have length 2") - } - if !matchSet(swl.Has([]cid.Cid{c0, c1}), []cid.Cid{c0, c1}) { - t.Fatal("Expected Has([c0, c1]) to be [c0, c1]") - } - if !matchSet(swl.SessionHas(s0, []cid.Cid{c0, c1}), []cid.Cid{c0, c1}) { - t.Fatal("Expected SessionHas(s0, [c0, c1]) to be [c0, c1]") - } - if !matchSet(swl.SessionHas(s1, []cid.Cid{c0, c1}), []cid.Cid{c0}) { - t.Fatal("Expected SessionHas(s1, [c0, c1]) to be [c0]") - } -} - -func TestSimpleRemoveKeys(t *testing.T) { - swl := NewSessionWantlist() - - // s0: c0, c1 - // s1: c0 - swl.Add([]cid.Cid{c0, c1}, s0) - swl.Add([]cid.Cid{c0}, s1) - - // s0: c1 - swl.RemoveKeys([]cid.Cid{c0}) - if len(swl.Keys()) != 1 { - t.Fatal("Expected Keys() to have length 1") - } - if !swl.Keys()[0].Equals(c1) { - t.Fatal("Expected Keys() to be [cid1]") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 0 { - t.Fatal("Expected SessionsFor(c0) to be empty") - } - if len(swl.SessionsFor([]cid.Cid{c1})) != 1 { - t.Fatal("Expected SessionsFor(c1) to have length 1") - } - if swl.SessionsFor([]cid.Cid{c1})[0] != s0 { - t.Fatal("Expected SessionsFor(c1) to be [s0]") - } -} - -func TestMultiRemoveKeys(t *testing.T) { - swl := NewSessionWantlist() - - // s0: c0, c1 - // s1: c0 - swl.Add([]cid.Cid{c0, c1}, s0) - swl.Add([]cid.Cid{c0}, s1) - - // - swl.RemoveKeys([]cid.Cid{c0, c1}) - if len(swl.Keys()) != 0 { - t.Fatal("Expected Keys() to be empty") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 0 { - t.Fatal("Expected SessionsFor() to be empty") - } -} - -func TestRemoveSession(t *testing.T) { - swl := NewSessionWantlist() - - // s0: c0, c1 - // s1: c0 - swl.Add([]cid.Cid{c0, c1}, s0) - swl.Add([]cid.Cid{c0}, s1) - - // s1: c0 - swl.RemoveSession(s0) - if len(swl.Keys()) != 1 { - t.Fatal("Expected Keys() to have length 1") - } - if !swl.Keys()[0].Equals(c0) { - t.Fatal("Expected Keys() to be [cid0]") - } - if len(swl.SessionsFor([]cid.Cid{c1})) != 0 { - t.Fatal("Expected SessionsFor(c1) to be empty") - } - if len(swl.SessionsFor([]cid.Cid{c0})) != 1 { - t.Fatal("Expected SessionsFor(c0) to have length 1") - } - if swl.SessionsFor([]cid.Cid{c0})[0] != s1 { - t.Fatal("Expected SessionsFor(c0) to be [s1]") - } -} - -func TestRemoveSessionKeys(t *testing.T) { - swl := NewSessionWantlist() - - // s0: c0, c1, c2 - // s1: c0 - swl.Add([]cid.Cid{c0, c1, c2}, s0) - swl.Add([]cid.Cid{c0}, s1) - - // s0: c2 - // s1: c0 - swl.RemoveSessionKeys(s0, []cid.Cid{c0, c1}) - if !matchSet(swl.SessionHas(s0, []cid.Cid{c0, c1, c2}), []cid.Cid{c2}) { - t.Fatal("Expected SessionHas(s0, [c0, c1, c2]) to be [c2]") - } - if !matchSet(swl.SessionHas(s1, []cid.Cid{c0, c1, c2}), []cid.Cid{c0}) { - t.Fatal("Expected SessionHas(s1, [c0, c1, c2]) to be [c0]") - } -} - -func matchSet(ks1 *cid.Set, ks2 []cid.Cid) bool { - if ks1.Len() != len(ks2) { - return false - } - - for _, k := range ks2 { - if !ks1.Has(k) { - return false - } - } - return true -} From ca25b01477ee51e9e309ea782d3fe24f1b9ad1b1 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 27 Apr 2020 17:33:39 -0400 Subject: [PATCH 2/2] refactor: SessionInterestManager perf --- .../sessioninterestmanager/sessioninterestmanager.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/sessioninterestmanager/sessioninterestmanager.go b/internal/sessioninterestmanager/sessioninterestmanager.go index afb62516..0ab32ed1 100644 --- a/internal/sessioninterestmanager/sessioninterestmanager.go +++ b/internal/sessioninterestmanager/sessioninterestmanager.go @@ -37,12 +37,12 @@ func (sim *SessionInterestManager) RecordSessionInterest(ses uint64, ks []cid.Ci // For each key for _, c := range ks { - // If there is no list of sessions for the key, create one - if _, ok := sim.wants[c]; !ok { - sim.wants[c] = make(map[uint64]bool) + // Record that the session wants the blocks + if want, ok := sim.wants[c]; ok { + want[ses] = true + } else { + sim.wants[c] = map[uint64]bool{ses: true} } - // Record that the session wants the block - sim.wants[c][ses] = true } }