Skip to content

Commit

Permalink
decaying tags: support removal and closure. (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored May 15, 2020
1 parent bce720e commit 9fba560
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 18 deletions.
112 changes: 95 additions & 17 deletions p2p/net/connmgr/decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type bumpCmd struct {
delta int
}

// removeCmd represents a tag removal command.
type removeCmd struct {
peer peer.ID
tag *decayingTag
}

// decayer tracks and manages all decaying tags and their values.
type decayer struct {
cfg *DecayerCfg
Expand All @@ -34,8 +40,10 @@ type decayer struct {
// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value

// bumpCh queues bump commands to be processed by the loop.
bumpCh chan bumpCmd
// bumpTagCh queues bump commands to be processed by the loop.
bumpTagCh chan bumpCmd
removeTagCh chan removeCmd
closeTagCh chan *decayingTag

// closure thingies.
closeCh chan struct{}
Expand Down Expand Up @@ -70,13 +78,15 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
}

d := &decayer{
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpCh: make(chan bumpCmd, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpTagCh: make(chan bumpCmd, 128),
removeTagCh: make(chan removeCmd, 128),
closeTagCh: make(chan *decayingTag, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}

d.lastTick.Store(d.clock.Now())
Expand Down Expand Up @@ -203,7 +213,7 @@ func (d *decayer) process() {
delete(visit, tag)
}

case bmp = <-d.bumpCh:
case bmp = <-d.bumpTagCh:
var (
now = d.clock.Now()
peer, tag = bmp.peer, bmp.tag
Expand Down Expand Up @@ -231,9 +241,42 @@ func (d *decayer) process() {

s.Unlock()

case rm := <-d.removeTagCh:
s := d.mgr.segments.get(rm.peer)
s.Lock()

p := s.tagInfoFor(rm.peer)
v, ok := p.decaying[rm.tag]
if !ok {
s.Unlock()
continue
}
p.value -= v.Value
delete(p.decaying, rm.tag)
s.Unlock()

case t := <-d.closeTagCh:
// Stop tracking the tag.
d.tagsMu.Lock()
delete(d.knownTags, t.name)
d.tagsMu.Unlock()

// Remove the tag from all peers that had it in the connmgr.
for _, s := range d.mgr.segments {
// visit all segments, and attempt to remove the tag from all the peers it stores.
s.Lock()
for _, p := range s.peers {
if dt, ok := p.decaying[t]; ok {
// decrease the value of the tagInfo, and delete the tag.
p.value -= dt.Value
delete(p.decaying, t)
}
}
s.Unlock()
}

case <-d.closeCh:
return

}
}
}
Expand All @@ -247,6 +290,10 @@ type decayingTag struct {
nextTick time.Time
decayFn connmgr.DecayFn
bumpFn connmgr.BumpFn

// closed marks this tag as closed, so that if it's bumped after being
// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
closed int32
}

var _ connmgr.DecayingTag = (*decayingTag)(nil)
Expand All @@ -261,18 +308,49 @@ func (t *decayingTag) Interval() time.Duration {

// Bump bumps a tag for this peer.
func (t *decayingTag) Bump(p peer.ID, delta int) error {
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
}

bmp := bumpCmd{peer: p, tag: t, delta: delta}

select {
case t.trkr.bumpCh <- bmp:
case t.trkr.bumpTagCh <- bmp:
return nil

default:
return fmt.Errorf(
"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
p.Pretty(),
t.name,
delta,
len(t.trkr.bumpCh))
p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
}
}

func (t *decayingTag) Remove(p peer.ID) error {
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
}

rm := removeCmd{peer: p, tag: t}

select {
case t.trkr.removeTagCh <- rm:
return nil
default:
return fmt.Errorf(
"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
p.Pretty(), t.name, len(t.trkr.removeTagCh))
}
}

func (t *decayingTag) Close() error {
if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
return nil
}

select {
case t.trkr.closeTagCh <- t:
return nil
default:
return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
}
}
100 changes: 99 additions & 1 deletion p2p/net/connmgr/decay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestResolutionMisaligned(t *testing.T) {
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// nothing has happened.
// first tick.
mockClock.Add(TestResolution)
require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])
Expand All @@ -301,6 +301,104 @@ func TestResolutionMisaligned(t *testing.T) {
require.Equal(1997, mgr.GetTagInfo(id).Value)
}

func TestTagRemoval(t *testing.T) {
var (
id1, id2 = tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)

tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

// id1 has both tags; id2 only has the first tag.
_ = tag1.Bump(id1, 1000)
_ = tag2.Bump(id1, 1000)
_ = tag1.Bump(id2, 1000)

// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// first tick.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id1).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id1).Tags["bop"])
require.Equal(999, mgr.GetTagInfo(id2).Tags["beep"])

require.Equal(999*2, mgr.GetTagInfo(id1).Value)
require.Equal(999, mgr.GetTagInfo(id2).Value)

// remove tag1 from p1.
err = tag1.Remove(id1)

// allow the background goroutine to process the removal.
<-time.After(500 * time.Millisecond)
require.NoError(err)

// next tick. both peers only have 1 tag, both at 998 value.
mockClock.Add(TestResolution)
require.Zero(mgr.GetTagInfo(id1).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id1).Tags["bop"])
require.Equal(998, mgr.GetTagInfo(id2).Tags["beep"])

require.Equal(998, mgr.GetTagInfo(id1).Value)
require.Equal(998, mgr.GetTagInfo(id2).Value)

// remove tag1 from p1 again; no error.
err = tag1.Remove(id1)
require.NoError(err)
}

func TestTagClosure(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)

tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

_ = tag1.Bump(id, 1000)
_ = tag2.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// nothing has happened.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(999*2, mgr.GetTagInfo(id).Value)

// next tick; tag1 would've ticked.
mockClock.Add(TestResolution)
require.Equal(998, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(998*2, mgr.GetTagInfo(id).Value)

// close the tag.
err = tag1.Close()
require.NoError(err)

// allow the background goroutine to process the closure.
<-time.After(500 * time.Millisecond)
require.Equal(998, mgr.GetTagInfo(id).Value)

// a second closure should not error.
err = tag1.Close()
require.NoError(err)

// bumping a tag after it's been closed should error.
err = tag1.Bump(id, 5)
require.Error(err)
}

func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) {
mockClock := clock.NewMock()
cfg := &DecayerCfg{
Expand Down

0 comments on commit 9fba560

Please sign in to comment.