Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

Fix concurrency and silence period not being honoured #26

Merged
merged 2 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"time"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-interface-connmgr"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inet "github.com/libp2p/go-libp2p-net"
"github.com/libp2p/go-libp2p-peer"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)

const silencePeriod = 10 * time.Second

var log = logging.Logger("connmgr")

// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
Expand All @@ -23,17 +25,16 @@ var log = logging.Logger("connmgr")
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int

lk sync.Mutex
highWater int
lowWater int
connCount int
gracePeriod time.Duration
peers map[peer.ID]*peerInfo

peers map[peer.ID]*peerInfo
connCount int

lk sync.Mutex

lastTrim time.Time
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
}

var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
Expand All @@ -46,10 +47,11 @@ var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
return &BasicConnMgr{
highWater: hi,
lowWater: low,
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
highWater: hi,
lowWater: low,
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
trimRunningCh: make(chan struct{}, 1),
}
}

Expand All @@ -67,13 +69,27 @@ type peerInfo struct {
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
// grace period.
//
// TODO: error return value so we can cleanly signal we are aborting because:
// (a) there's another trim in progress, or (b) the silence period is in effect.
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
raulk marked this conversation as resolved.
Show resolved Hide resolved
select {
case cm.trimRunningCh <- struct{}{}:
default:
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < silencePeriod {
// skip this attempt to trim as the last one just took place.
return
}
defer log.EventBegin(ctx, "connCleanup").Done()
for _, c := range cm.getConnsToClose(ctx) {
log.Info("closing conn: ", c.RemotePeer())
log.Event(ctx, "closeConn", c.RemotePeer())
c.Close()
}
cm.lastTrim = time.Now()
}

// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
Expand All @@ -87,8 +103,6 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
return nil
}
now := time.Now()
cm.lastTrim = now

if len(cm.peers) < cm.lowWater {
log.Info("open connection count below limit")
return nil
Expand Down Expand Up @@ -263,9 +277,7 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) {
cm.connCount++

if cm.connCount > nn.highWater {
if cm.lastTrim.IsZero() || time.Since(cm.lastTrim) > time.Second*10 {
go cm.TrimOpenConns(context.Background())
}
go cm.TrimOpenConns(context.Background())
}
}

Expand Down
61 changes: 49 additions & 12 deletions connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ import (

type tconn struct {
inet.Conn
peer peer.ID
closed bool

peer peer.ID
closed bool
disconnectNotify func(net inet.Network, conn inet.Conn)
}

func (c *tconn) Close() error {
c.closed = true
if c.disconnectNotify != nil {
c.disconnectNotify(nil, c)
}
return nil
}

Expand All @@ -34,9 +39,9 @@ func (c *tconn) RemoteMultiaddr() ma.Multiaddr {
return addr
}

func randConn(t *testing.T) inet.Conn {
func randConn(t *testing.T, discNotify func(inet.Network, inet.Conn)) inet.Conn {
pid := tu.RandPeerIDFatal(t)
return &tconn{peer: pid}
return &tconn{peer: pid, disconnectNotify: discNotify}
}

func TestConnTrimming(t *testing.T) {
Expand All @@ -45,7 +50,7 @@ func TestConnTrimming(t *testing.T) {

var conns []inet.Conn
for i := 0; i < 300; i++ {
rc := randConn(t)
rc := randConn(t, nil)
conns = append(conns, rc)
not.Connected(nil, rc)
}
Expand Down Expand Up @@ -98,7 +103,7 @@ func TestConnsToClose(t *testing.T) {
cm = NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
for i := 0; i < 5; i++ {
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
}
conns = cm.getConnsToClose(context.Background())
Expand All @@ -111,7 +116,7 @@ func TestGetTagInfo(t *testing.T) {
start := time.Now()
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
end := time.Now()

Expand Down Expand Up @@ -192,7 +197,7 @@ func TestTagPeerNonExistant(t *testing.T) {
func TestUntagPeer(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
rp := conn.RemotePeer()
cm.TagPeer(rp, "tag", 5)
Expand Down Expand Up @@ -223,7 +228,7 @@ func TestGetInfo(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TrimOpenConns(context.Background())
end := time.Now()
Expand All @@ -250,7 +255,7 @@ func TestDoubleConnection(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)
not.Connected(nil, conn)
Expand All @@ -266,11 +271,11 @@ func TestDisconnected(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)

not.Disconnected(nil, randConn(t))
not.Disconnected(nil, randConn(t, nil))
if cm.connCount != 1 {
t.Fatal("unexpected number of connections")
}
Expand All @@ -294,3 +299,35 @@ func TestDisconnected(t *testing.T) {
t.Fatal("unexpected number of peers")
}
}

// see https://github.com/libp2p/go-libp2p-connmgr/issues/23
func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
cm := NewConnManager(10, 20, 0)
not := cm.Notifee()

var conns []inet.Conn

// quickly produce 30 connections (sending us above the high watermark)
for i := 0; i < 30; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
}

// wait for a few seconds
time.Sleep(time.Second * 3)

// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
for _, c := range conns {
if c.(*tconn).closed {
closed++
}
}
if closed > 20 {
t.Fatalf("should have closed at most 20 connections, closed: %d", closed)
}
if total := closed + cm.connCount; total != 30 {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}