Skip to content

Commit

Permalink
connmgr: use clock interface (#1720)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo authored Aug 26, 2022
1 parent 423eab2 commit 37f1230
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 18 deletions.
27 changes: 16 additions & 11 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -27,6 +28,8 @@ var log = logging.Logger("connmgr")
type BasicConnMgr struct {
*decayer

clock clock.Clock

cfg *config
segments segments

Expand Down Expand Up @@ -74,15 +77,15 @@ func (ss *segments) countPeers() (count int) {
return count
}

func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
func (s *segment) tagInfoFor(p peer.ID, now time.Time) *peerInfo {
pi, ok := s.peers[p]
if ok {
return pi
}
// create a temporary peer to buffer early tags before the Connected notification arrives.
pi = &peerInfo{
id: p,
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
firstSeen: now, // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
Expand All @@ -102,6 +105,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {
lowWater: low,
gracePeriod: time.Minute,
silencePeriod: 10 * time.Second,
clock: clock.New(),
}
for _, o := range opts {
if err := o(cfg); err != nil {
Expand All @@ -116,6 +120,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) {

cm := &BasicConnMgr{
cfg: cfg,
clock: cfg.clock,
protected: make(map[peer.ID]map[string]struct{}, 16),
segments: func() (ret segments) {
for i := range ret {
Expand Down Expand Up @@ -167,7 +172,7 @@ func (cm *BasicConnMgr) memoryEmergency() {

// finally, update the last trim time.
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrim = cm.clock.Now()
cm.lastTrimMu.Unlock()
}

Expand Down Expand Up @@ -311,7 +316,7 @@ func (cm *BasicConnMgr) background() {
interval = cm.cfg.silencePeriod
}

ticker := time.NewTicker(interval)
ticker := cm.clock.Ticker(interval)
defer ticker.Stop()

for {
Expand All @@ -336,7 +341,7 @@ func (cm *BasicConnMgr) doTrim() {
if count == atomic.LoadUint64(&cm.trimCount) {
cm.trim()
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrim = cm.clock.Now()
cm.lastTrimMu.Unlock()
atomic.AddUint64(&cm.trimCount, 1)
}
Expand Down Expand Up @@ -427,7 +432,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {

candidates := make(peerInfos, 0, cm.segments.countPeers())
var ncandidates int
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
gracePeriodStart := cm.clock.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
for _, s := range cm.segments {
Expand Down Expand Up @@ -529,7 +534,7 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
s.Lock()
defer s.Unlock()

pi := s.tagInfoFor(p)
pi := s.tagInfoFor(p, cm.clock.Now())

// Update the total value of the peer.
pi.value += val - pi.tags[tag]
Expand Down Expand Up @@ -559,7 +564,7 @@ func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
s.Lock()
defer s.Unlock()

pi := s.tagInfoFor(p)
pi := s.tagInfoFor(p, cm.clock.Now())

oldval := pi.tags[tag]
newval := upsert(oldval)
Expand Down Expand Up @@ -629,7 +634,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
if !ok {
pinfo = &peerInfo{
id: id,
firstSeen: time.Now(),
firstSeen: cm.clock.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
Expand All @@ -640,7 +645,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
// Connected notification arrived: flip the temporary flag, and update the firstSeen
// timestamp to the real one.
pinfo.temp = false
pinfo.firstSeen = time.Now()
pinfo.firstSeen = cm.clock.Now()
}

_, ok = pinfo.conns[c]
Expand All @@ -649,7 +654,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
return
}

pinfo.conns[c] = time.Now()
pinfo.conns[c] = cm.clock.Now()
atomic.AddInt32(&cm.connCount, 1)
}

Expand Down
13 changes: 8 additions & 5 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -411,7 +412,8 @@ func TestDisconnected(t *testing.T) {

func TestGracePeriod(t *testing.T) {
const gp = 100 * time.Millisecond
cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Hour))
mockClock := clock.NewMock()
cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Hour), WithClock(mockClock))
require.NoError(t, err)
defer cm.Close()

Expand All @@ -425,7 +427,7 @@ func TestGracePeriod(t *testing.T) {
conns = append(conns, rc)
not.Connected(nil, rc)

time.Sleep(2 * gp)
mockClock.Add(2 * gp)

if rc.(*tconn).isClosed() {
t.Fatal("expected conn to remain open")
Expand All @@ -447,7 +449,7 @@ func TestGracePeriod(t *testing.T) {
}
}

time.Sleep(200 * time.Millisecond)
mockClock.Add(200 * time.Millisecond)

cm.TrimOpenConns(context.Background())

Expand All @@ -465,7 +467,8 @@ func TestGracePeriod(t *testing.T) {

// see https://github.com/libp2p/go-libp2p-connmgr/issues/23
func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
cm, err := NewConnManager(10, 20, WithGracePeriod(0))
mockClock := clock.NewMock()
cm, err := NewConnManager(10, 20, WithGracePeriod(0), WithClock(mockClock))
require.NoError(t, err)
defer cm.Close()
not := cm.Notifee()
Expand All @@ -480,7 +483,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
}

// wait for a few seconds
time.Sleep(time.Second * 3)
mockClock.Add(3 * time.Second)

// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/connmgr/decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (d *decayer) process() {
s := d.mgr.segments.get(peer)
s.Lock()

p := s.tagInfoFor(peer)
p := s.tagInfoFor(peer, d.clock.Now())
v, ok := p.decaying[tag]
if !ok {
v = &connmgr.DecayingValue{
Expand All @@ -244,7 +244,7 @@ func (d *decayer) process() {
s := d.mgr.segments.get(rm.peer)
s.Lock()

p := s.tagInfoFor(rm.peer)
p := s.tagInfoFor(rm.peer, d.clock.Now())
v, ok := p.decaying[rm.tag]
if !ok {
s.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions p2p/net/connmgr/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package connmgr
import (
"errors"
"time"

"github.com/benbjohnson/clock"
)

// config is the configuration struct for the basic connection manager.
Expand All @@ -13,6 +15,7 @@ type config struct {
silencePeriod time.Duration
decayer *DecayerCfg
emergencyTrim bool
clock clock.Clock
}

// Option represents an option for the basic connection manager.
Expand All @@ -26,6 +29,14 @@ func DecayerConfig(opts *DecayerCfg) Option {
}
}

// WithClock sets the internal clock impl
func WithClock(c clock.Clock) Option {
return func(cfg *config) error {
cfg.clock = c
return nil
}
}

// WithGracePeriod sets the grace period.
// The grace period is the time a newly opened connection is given before it becomes
// subject to pruning.
Expand Down

0 comments on commit 37f1230

Please sign in to comment.