diff --git a/p2p/net/connmgr/connmgr.go b/p2p/net/connmgr/connmgr.go index 22b83c44eb..c3587ad4e3 100644 --- a/p2p/net/connmgr/connmgr.go +++ b/p2p/net/connmgr/connmgr.go @@ -243,19 +243,11 @@ type peerInfo struct { type peerInfos []peerInfo -func (p peerInfos) SortByValue() { - sort.Slice(p, func(i, j int) bool { - left, right := p[i], p[j] - // temporary peers are preferred for pruning. - if left.temp != right.temp { - return left.temp - } - // otherwise, compare by value. - return left.value < right.value - }) -} - -func (p peerInfos) SortByValueAndStreams() { +// SortByValueAndStreams sorts peerInfos by their value and stream count. It +// will sort peers with no streams before those with streams (all else being +// equal). If `sortByMoreStreams` is true it will sort peers with more streams +// before those with fewer streams. This is useful to prioritize freeing memory. +func (p peerInfos) SortByValueAndStreams(sortByMoreStreams bool) { sort.Slice(p, func(i, j int) bool { left, right := p[i], p[j] // temporary peers are preferred for pruning. @@ -278,12 +270,21 @@ func (p peerInfos) SortByValueAndStreams() { } leftIncoming, leftStreams := incomingAndStreams(left.conns) rightIncoming, rightStreams := incomingAndStreams(right.conns) + // prefer closing inactive connections (no streams open) + if rightStreams != leftStreams && (leftStreams == 0 || rightStreams == 0) { + return leftStreams < rightStreams + } // incoming connections are preferred for pruning if leftIncoming != rightIncoming { return leftIncoming } - // prune connections with a higher number of streams first - return rightStreams < leftStreams + + if sortByMoreStreams { + // prune connections with a higher number of streams first + return rightStreams < leftStreams + } else { + return leftStreams < rightStreams + } }) } @@ -368,7 +369,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn { cm.plk.RUnlock() // Sort peers according to their value. - candidates.SortByValueAndStreams() + candidates.SortByValueAndStreams(true) selected := make([]network.Conn, 0, target+10) for _, inf := range candidates { @@ -398,7 +399,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn { } cm.plk.RUnlock() - candidates.SortByValueAndStreams() + candidates.SortByValueAndStreams(true) for _, inf := range candidates { if target <= 0 { break @@ -459,7 +460,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn { } // Sort peers according to their value. - candidates.SortByValue() + candidates.SortByValueAndStreams(false) target := ncandidates - cm.cfg.lowWater diff --git a/p2p/net/connmgr/connmgr_test.go b/p2p/net/connmgr/connmgr_test.go index c2db206cb4..92c273c5f0 100644 --- a/p2p/net/connmgr/connmgr_test.go +++ b/p2p/net/connmgr/connmgr_test.go @@ -40,6 +40,15 @@ func (c *tconn) RemotePeer() peer.ID { return c.peer } +func (c *tconn) Stat() network.ConnStats { + return network.ConnStats{ + Stats: network.Stats{ + Direction: network.DirOutbound, + }, + NumStreams: 1, + } +} + func (c *tconn) RemoteMultiaddr() ma.Multiaddr { addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234") if err != nil { @@ -800,7 +809,7 @@ func TestPeerInfoSorting(t *testing.T) { p1 := peerInfo{id: peer.ID("peer1")} p2 := peerInfo{id: peer.ID("peer2"), temp: true} pis := peerInfos{p1, p2} - pis.SortByValue() + pis.SortByValueAndStreams(false) require.Equal(t, pis, peerInfos{p2, p1}) }) @@ -808,31 +817,67 @@ func TestPeerInfoSorting(t *testing.T) { p1 := peerInfo{id: peer.ID("peer1"), value: 40} p2 := peerInfo{id: peer.ID("peer2"), value: 20} pis := peerInfos{p1, p2} - pis.SortByValue() + pis.SortByValueAndStreams(false) require.Equal(t, pis, peerInfos{p2, p1}) }) - t.Run("in a memory emergency, starts with incoming connections", func(t *testing.T) { + t.Run("prefer peers with no streams", func(t *testing.T) { + p1 := peerInfo{id: peer.ID("peer1"), + conns: map[network.Conn]time.Time{ + &mockConn{stats: network.ConnStats{NumStreams: 0}}: time.Now(), + }, + } + p2 := peerInfo{id: peer.ID("peer2"), + conns: map[network.Conn]time.Time{ + &mockConn{stats: network.ConnStats{NumStreams: 1}}: time.Now(), + }, + } + pis := peerInfos{p2, p1} + pis.SortByValueAndStreams(false) + require.Equal(t, pis, peerInfos{p1, p2}) + }) + + t.Run("in a memory emergency, starts with incoming connections and higher streams", func(t *testing.T) { incoming := network.ConnStats{} incoming.Direction = network.DirInbound outgoing := network.ConnStats{} outgoing.Direction = network.DirOutbound + + outgoingSomeStreams := network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}, NumStreams: 1} + outgoingMoreStreams := network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}, NumStreams: 2} p1 := peerInfo{ id: peer.ID("peer1"), conns: map[network.Conn]time.Time{ - &mockConn{stats: outgoing}: time.Now(), + &mockConn{stats: outgoingSomeStreams}: time.Now(), }, } p2 := peerInfo{ id: peer.ID("peer2"), + conns: map[network.Conn]time.Time{ + &mockConn{stats: outgoingSomeStreams}: time.Now(), + &mockConn{stats: incoming}: time.Now(), + }, + } + p3 := peerInfo{ + id: peer.ID("peer3"), conns: map[network.Conn]time.Time{ &mockConn{stats: outgoing}: time.Now(), &mockConn{stats: incoming}: time.Now(), }, } - pis := peerInfos{p1, p2} - pis.SortByValueAndStreams() - require.Equal(t, pis, peerInfos{p2, p1}) + p4 := peerInfo{ + id: peer.ID("peer4"), + conns: map[network.Conn]time.Time{ + &mockConn{stats: outgoingMoreStreams}: time.Now(), + &mockConn{stats: incoming}: time.Now(), + }, + } + pis := peerInfos{p1, p2, p3, p4} + pis.SortByValueAndStreams(true) + // p3 is first because it is inactive (no streams). + // p4 is second because it has the most streams and we priortize killing + // connections with the higher number of streams. + require.Equal(t, pis, peerInfos{p3, p4, p2, p1}) }) t.Run("in a memory emergency, starts with connections that have many streams", func(t *testing.T) { @@ -850,7 +895,7 @@ func TestPeerInfoSorting(t *testing.T) { }, } pis := peerInfos{p1, p2} - pis.SortByValueAndStreams() + pis.SortByValueAndStreams(true) require.Equal(t, pis, peerInfos{p2, p1}) }) }