Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix flaky TestTaggingUseful test #494

Merged
merged 1 commit into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module github.com/ipfs/go-bitswap

require (
github.com/benbjohnson/clock v1.1.0
github.com/cskr/pubsub v1.0.2
github.com/gogo/protobuf v1.3.1
github.com/google/uuid v1.1.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETF
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
Expand Down
37 changes: 21 additions & 16 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/ipfs/go-bitswap/internal/testutil"
message "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
Expand Down Expand Up @@ -91,13 +92,13 @@ type engineSet struct {
}

func newTestEngine(ctx context.Context, idStr string) engineSet {
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil)
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New())
}

func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e := newEngine(bs, 4, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand Down Expand Up @@ -184,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -512,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
Expand Down Expand Up @@ -668,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
Expand Down Expand Up @@ -853,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
Expand All @@ -878,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -922,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -986,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e := newEngine(bs, 4, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil, clock.New()))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
Expand Down Expand Up @@ -1044,13 +1045,15 @@ func TestTaggingPeers(t *testing.T) {
}

func TestTaggingUseful(t *testing.T) {
peerSampleInterval := 1 * time.Millisecond
peerSampleIntervalHalf := 10 * time.Millisecond

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

sampleCh := make(chan struct{})
me := newTestEngineWithSampling(ctx, "engine", peerSampleInterval, sampleCh)
mockClock := clock.NewMock()
me := newTestEngineWithSampling(ctx, "engine", peerSampleIntervalHalf*2, sampleCh, mockClock)
mockClock.Add(1 * time.Millisecond)
friend := peer.ID("friend")

block := blocks.NewBlock([]byte("foobar"))
Expand All @@ -1061,18 +1064,18 @@ func TestTaggingUseful(t *testing.T) {
if untagged := me.PeerTagger.count(me.Engine.tagUseful); untagged != 0 {
t.Fatalf("%d peers should be untagged but weren't", untagged)
}

mockClock.Add(peerSampleIntervalHalf)
me.Engine.MessageSent(friend, msg)

for j := 0; j < 2; j++ {
<-sampleCh
}
mockClock.Add(peerSampleIntervalHalf)
<-sampleCh

if tagged := me.PeerTagger.count(me.Engine.tagUseful); tagged != 1 {
t.Fatalf("1 peer should be tagged, but %d were", tagged)
}

for j := 0; j < longTermRatio; j++ {
mockClock.Add(peerSampleIntervalHalf * 2)
<-sampleCh
}
}
Expand All @@ -1082,6 +1085,7 @@ func TestTaggingUseful(t *testing.T) {
}

for j := 0; j < longTermRatio; j++ {
mockClock.Add(peerSampleIntervalHalf * 2)
<-sampleCh
}

Expand All @@ -1090,6 +1094,7 @@ func TestTaggingUseful(t *testing.T) {
}

for j := 0; j < longTermRatio; j++ {
mockClock.Add(peerSampleIntervalHalf * 2)
<-sampleCh
}

Expand Down
21 changes: 14 additions & 7 deletions internal/decision/scoreledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"time"

"github.com/benbjohnson/clock"
peer "github.com/libp2p/go-libp2p-core/peer"
)

Expand Down Expand Up @@ -55,6 +56,8 @@ type scoreledger struct {

// the record lock
lock sync.RWMutex

clock clock.Clock
}

// Receipt is a summary of the ledger for a given peer
Expand All @@ -73,7 +76,7 @@ func (l *scoreledger) AddToSentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.lastExchange = l.clock.Now()
l.bytesSent += uint64(n)
}

Expand All @@ -82,7 +85,7 @@ func (l *scoreledger) AddToReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.lastExchange = l.clock.Now()
l.bytesRecv += uint64(n)
}

Expand Down Expand Up @@ -114,6 +117,7 @@ type DefaultScoreLedger struct {
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}
clock clock.Clock
}

// scoreWorker keeps track of how "useful" our peers are, updating scores in the
Expand All @@ -134,7 +138,7 @@ type DefaultScoreLedger struct {
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (dsl *DefaultScoreLedger) scoreWorker() {
ticker := time.NewTicker(dsl.peerSampleInterval)
ticker := dsl.clock.Ticker(dsl.peerSampleInterval)
defer ticker.Stop()

type update struct {
Expand Down Expand Up @@ -236,9 +240,10 @@ func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger {
}

// Returns a new scoreledger.
func newScoreLedger(p peer.ID) *scoreledger {
func newScoreLedger(p peer.ID, clock clock.Clock) *scoreledger {
return &scoreledger{
partner: p,
clock: clock,
}
}

Expand All @@ -255,7 +260,7 @@ func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger {
defer dsl.lock.Unlock()
l, ok := dsl.ledgerMap[p]
if !ok {
l = newScoreLedger(p)
l = newScoreLedger(p, dsl.clock)
dsl.ledgerMap[p] = l
}
return l
Expand Down Expand Up @@ -315,7 +320,7 @@ func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) {
defer dsl.lock.Unlock()
_, ok := dsl.ledgerMap[p]
if !ok {
dsl.ledgerMap[p] = newScoreLedger(p)
dsl.ledgerMap[p] = newScoreLedger(p, dsl.clock)
}
}

Expand All @@ -333,14 +338,16 @@ func NewDefaultScoreLedger() *DefaultScoreLedger {
ledgerMap: make(map[peer.ID]*scoreledger),
closing: make(chan struct{}),
peerSampleInterval: shortTerm,
clock: clock.New(),
}
}

// Creates a new instance of the default score ledger with testing
// parameters.
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}) *DefaultScoreLedger {
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) *DefaultScoreLedger {
dsl := NewDefaultScoreLedger()
dsl.peerSampleInterval = peerSampleInterval
dsl.sampleCh = sampleCh
dsl.clock = clock
return dsl
}