-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds exponential backoff to re-spawing new streams for supposedly dead peers #483
Changes from 16 commits
c3a6760
5e8ec29
7c58f7a
42e310b
90cdd55
b3f58bc
a9f4edf
a77d435
6e4b2f8
2761b98
6401d8b
4c94e5f
e260291
c74ae78
7f815f0
6ebc292
8b64966
c00510e
e9d42fa
eede9ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
cover.out | ||
prof.out | ||
go-floodsub.test | ||
|
||
.idea/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package pubsub | ||
|
||
import ( | ||
"context" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
const ( | ||
MinBackoffDelay = 100 * time.Millisecond | ||
MaxBackoffDelay = 10 * time.Second | ||
TimeToLive = 10 * time.Minute | ||
BackoffCleanupInterval = 1 * time.Minute | ||
BackoffMultiplier = 2 | ||
MaxBackoffJitterCoff = 100 | ||
MaxBackoffAttempts = 4 | ||
) | ||
|
||
type backoffHistory struct { | ||
duration time.Duration | ||
lastTried time.Time | ||
attempts int | ||
} | ||
|
||
type backoff struct { | ||
mu sync.Mutex | ||
info map[peer.ID]*backoffHistory | ||
ct int // size threshold that kicks off the cleaner | ||
ci time.Duration // cleanup intervals | ||
maxAttempts int // maximum backoff attempts prior to ejection | ||
} | ||
|
||
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff { | ||
b := &backoff{ | ||
mu: sync.Mutex{}, | ||
ct: sizeThreshold, | ||
ci: cleanupInterval, | ||
maxAttempts: maxAttempts, | ||
info: make(map[peer.ID]*backoffHistory), | ||
} | ||
|
||
rand.Seed(time.Now().UnixNano()) // used for jitter | ||
go b.cleanupLoop(ctx) | ||
|
||
return b | ||
} | ||
|
||
func (b *backoff) updateAndGet(id peer.ID) time.Duration { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
h, ok := b.info[id] | ||
switch { | ||
case !ok || time.Since(h.lastTried) > TimeToLive: | ||
// first request goes immediately. | ||
h = &backoffHistory{ | ||
duration: time.Duration(0), | ||
attempts: 0, | ||
} | ||
|
||
case h.duration < MinBackoffDelay: | ||
h.duration = MinBackoffDelay | ||
|
||
case h.duration < MaxBackoffDelay: | ||
jitter := rand.Intn(MaxBackoffJitterCoff) | ||
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond | ||
if h.duration > MaxBackoffDelay || h.duration < 0 { | ||
h.duration = MaxBackoffDelay | ||
} | ||
} | ||
|
||
h.lastTried = time.Now() | ||
h.attempts += 1 | ||
|
||
b.info[id] = h | ||
return h.duration | ||
} | ||
|
||
func (b *backoff) peerExceededBackoffThreshold(id peer.ID) bool { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
h, ok := b.info[id] | ||
if !ok { | ||
return false // no record of this peer is still there, hence fine. | ||
} | ||
return h.attempts > b.maxAttempts | ||
} | ||
|
||
func (b *backoff) cleanup() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
for id, h := range b.info { | ||
if time.Since(h.lastTried) > TimeToLive { | ||
delete(b.info, id) | ||
} | ||
} | ||
} | ||
|
||
func (b *backoff) cleanupLoop(ctx context.Context) { | ||
vyzo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return // pubsub shutting down | ||
case <-time.Tick(b.ci): | ||
b.cleanup() | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
package pubsub | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
func TestBackoff_Update(t *testing.T) { | ||
id1 := peer.ID("peer-1") | ||
id2 := peer.ID("peer-2") | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
size := 10 | ||
cleanupInterval := 5 * time.Second | ||
maxBackoffAttempts := 10 | ||
|
||
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts) | ||
|
||
if len(b.info) > 0 { | ||
t.Fatal("non-empty info map for backoff") | ||
} | ||
|
||
if d := b.updateAndGet(id1); d != time.Duration(0) { | ||
t.Fatalf("invalid initialization: %v", d) | ||
} | ||
if d := b.updateAndGet(id2); d != time.Duration(0) { | ||
t.Fatalf("invalid initialization: %v", d) | ||
} | ||
|
||
for i := 0; i < maxBackoffAttempts-1; i++ { | ||
got := b.updateAndGet(id1) | ||
|
||
expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) * | ||
float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond)) | ||
if expected > MaxBackoffDelay { | ||
expected = MaxBackoffDelay | ||
} | ||
|
||
if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual. | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got) | ||
} | ||
|
||
// update attempts on id1 are below threshold, hence peer should never go beyond backoff attempt threshold | ||
if b.peerExceededBackoffThreshold(id1) { | ||
t.Fatalf("invalid exceeding threshold status") | ||
} | ||
} | ||
|
||
// trying once more beyond the threshold, hence expecting exceeding threshold | ||
b.updateAndGet(id1) | ||
if !b.peerExceededBackoffThreshold(id1) { | ||
t.Fatal("update beyond max attempts does not reflect threshold") | ||
} | ||
|
||
got := b.updateAndGet(id2) | ||
if got != MinBackoffDelay { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got) | ||
} | ||
|
||
// sets last tried of id2 to long ago that it resets back upon next try. | ||
b.info[id2].lastTried = time.Now().Add(-TimeToLive) | ||
got = b.updateAndGet(id2) | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
// update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold | ||
if b.peerExceededBackoffThreshold(id2) { | ||
t.Fatalf("invalid exceeding threshold status") | ||
} | ||
|
||
if len(b.info) != 2 { | ||
t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info)) | ||
} | ||
|
||
} | ||
|
||
func TestBackoff_Clean(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
size := 10 | ||
cleanupInterval := 2 * time.Second | ||
maxBackoffAttempts := 100 // setting attempts to a high number hence testing cleanup logic. | ||
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts) | ||
|
||
for i := 0; i < size; i++ { | ||
id := peer.ID(fmt.Sprintf("peer-%d", i)) | ||
b.updateAndGet(id) | ||
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry | ||
} | ||
|
||
if len(b.info) != size { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info)) | ||
} | ||
|
||
// waits for a cleanup loop to kick-in | ||
time.Sleep(2 * cleanupInterval) | ||
|
||
// next update should trigger cleanup | ||
got := b.updateAndGet(peer.ID("some-new-peer")) | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
// except "some-new-peer" every other records must be cleaned up | ||
if len(b.info) != 1 { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"bufio" | ||
"context" | ||
"io" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
|
@@ -121,6 +122,17 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan | |
} | ||
} | ||
|
||
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) { | ||
delay := p.deadPeerBackoff.updateAndGet(pid) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to add a failure more if we have backed off too much and simply give up; say we try up to 10 times and then How does that sound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe 10 is even too much, 3-4 attempts should be enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
select { | ||
case <-time.After(delay): | ||
p.handleNewPeer(ctx, pid, outgoing) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { | ||
pid := s.Conn().RemotePeer() | ||
r := protoio.NewDelimitedReader(s, p.maxMessageSize) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,8 @@ type PubSub struct { | |
peerDeadPrioLk sync.RWMutex | ||
peerDeadMx sync.Mutex | ||
peerDeadPend map[peer.ID]struct{} | ||
// backoff for retrying new connections to dead peers | ||
deadPeerBackoff *backoff | ||
|
||
// The set of topics we are subscribed to | ||
mySubs map[string]map[*Subscription]struct{} | ||
|
@@ -252,6 +254,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option | |
newPeerError: make(chan peer.ID), | ||
peerDead: make(chan struct{}, 1), | ||
peerDeadPend: make(map[peer.ID]struct{}), | ||
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), | ||
cancelCh: make(chan *Subscription), | ||
getPeers: make(chan *listPeerReq), | ||
addSub: make(chan *addSubReq), | ||
|
@@ -680,13 +683,15 @@ func (p *PubSub) handleDeadPeers() { | |
|
||
close(ch) | ||
|
||
if p.host.Network().Connectedness(pid) == network.Connected { | ||
if p.host.Network().Connectedness(pid) == network.Connected && | ||
!p.deadPeerBackoff.peerExceededBackoffThreshold(pid) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we might want to (debug) log this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
// still connected, must be a duplicate connection being closed. | ||
// we respawn the writer as we need to ensure there is a stream active | ||
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid) | ||
messages := make(chan *RPC, p.peerOutboundQueueSize) | ||
messages <- p.getHelloPacket() | ||
go p.handleNewPeer(p.ctx, pid, messages) | ||
go p.handleNewPeerWithBackoff(p.ctx, pid, messages) | ||
p.peers[pid] = messages | ||
continue | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's get the time after checking the max attempts, will avoid the gettimeofday call in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please read my reply to the below comment as this part has got changed.