diff --git a/connmgr.go b/connmgr.go index 8e7d5aa..626fe76 100644 --- a/connmgr.go +++ b/connmgr.go @@ -7,12 +7,14 @@ import ( "time" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-interface-connmgr" + ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" inet "github.com/libp2p/go-libp2p-net" - "github.com/libp2p/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ) +const silencePeriod = 10 * time.Second + var log = logging.Logger("connmgr") // BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the @@ -23,17 +25,16 @@ var log = logging.Logger("connmgr") // // See configuration parameters in NewConnManager. type BasicConnMgr struct { - highWater int - lowWater int - + lk sync.Mutex + highWater int + lowWater int + connCount int gracePeriod time.Duration + peers map[peer.ID]*peerInfo - peers map[peer.ID]*peerInfo - connCount int - - lk sync.Mutex - - lastTrim time.Time + // channel-based semaphore that enforces only a single trim is in progress + trimRunningCh chan struct{} + lastTrim time.Time } var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil) @@ -46,10 +47,11 @@ var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil) // subject to pruning. func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { return &BasicConnMgr{ - highWater: hi, - lowWater: low, - gracePeriod: grace, - peers: make(map[peer.ID]*peerInfo), + highWater: hi, + lowWater: low, + gracePeriod: grace, + peers: make(map[peer.ID]*peerInfo), + trimRunningCh: make(chan struct{}, 1), } } @@ -67,13 +69,27 @@ type peerInfo struct { // equal the low watermark. Peers are sorted in ascending order based on their total value, // pruning those peers with the lowest scores first, as long as they are not within their // grace period. +// +// TODO: error return value so we can cleanly signal we are aborting because: +// (a) there's another trim in progress, or (b) the silence period is in effect. func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { + select { + case cm.trimRunningCh <- struct{}{}: + default: + return + } + defer func() { <-cm.trimRunningCh }() + if time.Since(cm.lastTrim) < silencePeriod { + // skip this attempt to trim as the last one just took place. + return + } defer log.EventBegin(ctx, "connCleanup").Done() for _, c := range cm.getConnsToClose(ctx) { log.Info("closing conn: ", c.RemotePeer()) log.Event(ctx, "closeConn", c.RemotePeer()) c.Close() } + cm.lastTrim = time.Now() } // getConnsToClose runs the heuristics described in TrimOpenConns and returns the @@ -87,8 +103,6 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { return nil } now := time.Now() - cm.lastTrim = now - if len(cm.peers) < cm.lowWater { log.Info("open connection count below limit") return nil @@ -263,9 +277,7 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) { cm.connCount++ if cm.connCount > nn.highWater { - if cm.lastTrim.IsZero() || time.Since(cm.lastTrim) > time.Second*10 { - go cm.TrimOpenConns(context.Background()) - } + go cm.TrimOpenConns(context.Background()) } } diff --git a/connmgr_test.go b/connmgr_test.go index cf11f20..90285c5 100644 --- a/connmgr_test.go +++ b/connmgr_test.go @@ -13,12 +13,17 @@ import ( type tconn struct { inet.Conn - peer peer.ID - closed bool + + peer peer.ID + closed bool + disconnectNotify func(net inet.Network, conn inet.Conn) } func (c *tconn) Close() error { c.closed = true + if c.disconnectNotify != nil { + c.disconnectNotify(nil, c) + } return nil } @@ -34,9 +39,9 @@ func (c *tconn) RemoteMultiaddr() ma.Multiaddr { return addr } -func randConn(t *testing.T) inet.Conn { +func randConn(t *testing.T, discNotify func(inet.Network, inet.Conn)) inet.Conn { pid := tu.RandPeerIDFatal(t) - return &tconn{peer: pid} + return &tconn{peer: pid, disconnectNotify: discNotify} } func TestConnTrimming(t *testing.T) { @@ -45,7 +50,7 @@ func TestConnTrimming(t *testing.T) { var conns []inet.Conn for i := 0; i < 300; i++ { - rc := randConn(t) + rc := randConn(t, nil) conns = append(conns, rc) not.Connected(nil, rc) } @@ -98,7 +103,7 @@ func TestConnsToClose(t *testing.T) { cm = NewConnManager(1, 1, time.Duration(10*time.Minute)) not := cm.Notifee() for i := 0; i < 5; i++ { - conn := randConn(t) + conn := randConn(t, nil) not.Connected(nil, conn) } conns = cm.getConnsToClose(context.Background()) @@ -111,7 +116,7 @@ func TestGetTagInfo(t *testing.T) { start := time.Now() cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) not := cm.Notifee() - conn := randConn(t) + conn := randConn(t, nil) not.Connected(nil, conn) end := time.Now() @@ -192,7 +197,7 @@ func TestTagPeerNonExistant(t *testing.T) { func TestUntagPeer(t *testing.T) { cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) not := cm.Notifee() - conn := randConn(t) + conn := randConn(t, nil) not.Connected(nil, conn) rp := conn.RemotePeer() cm.TagPeer(rp, "tag", 5) @@ -223,7 +228,7 @@ func TestGetInfo(t *testing.T) { gp := time.Duration(10 * time.Minute) cm := NewConnManager(1, 5, gp) not := cm.Notifee() - conn := randConn(t) + conn := randConn(t, nil) not.Connected(nil, conn) cm.TrimOpenConns(context.Background()) end := time.Now() @@ -250,7 +255,7 @@ func TestDoubleConnection(t *testing.T) { gp := time.Duration(10 * time.Minute) cm := NewConnManager(1, 5, gp) not := cm.Notifee() - conn := randConn(t) + conn := randConn(t, nil) not.Connected(nil, conn) cm.TagPeer(conn.RemotePeer(), "foo", 10) not.Connected(nil, conn) @@ -266,11 +271,11 @@ func TestDisconnected(t *testing.T) { gp := time.Duration(10 * time.Minute) cm := NewConnManager(1, 5, gp) not := cm.Notifee() - conn := randConn(t) + conn := randConn(t, nil) not.Connected(nil, conn) cm.TagPeer(conn.RemotePeer(), "foo", 10) - not.Disconnected(nil, randConn(t)) + not.Disconnected(nil, randConn(t, nil)) if cm.connCount != 1 { t.Fatal("unexpected number of connections") } @@ -294,3 +299,35 @@ func TestDisconnected(t *testing.T) { t.Fatal("unexpected number of peers") } } + +// see https://github.com/libp2p/go-libp2p-connmgr/issues/23 +func TestQuickBurstRespectsSilencePeriod(t *testing.T) { + cm := NewConnManager(10, 20, 0) + not := cm.Notifee() + + var conns []inet.Conn + + // quickly produce 30 connections (sending us above the high watermark) + for i := 0; i < 30; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + } + + // wait for a few seconds + time.Sleep(time.Second * 3) + + // only the first trim is allowed in; make sure we close at most 20 connections, not all of them. + var closed int + for _, c := range conns { + if c.(*tconn).closed { + closed++ + } + } + if closed > 20 { + t.Fatalf("should have closed at most 20 connections, closed: %d", closed) + } + if total := closed + cm.connCount; total != 30 { + t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total) + } +}