Skip to content

Commit

Permalink
Adds exponential backoff to re-spawing new streams for supposedly dea…
Browse files Browse the repository at this point in the history
…d peers (#483)

* updates gitignore

* implements draft solution

* consolidates update and get

* extends test

* adds cleaner logic

* removes a redundant else case

* refactors cleanup in a goroutine

* adds a jitter to backoff

* stretches the sleep for cleanup

* reduces jitter time

* fixes a test

* adds maximum backoff attempts

* returns error for closing channel

* refactors peer status exceed backoff threshold

* converts if-else to switch

* nit

* consolidates update and maximum backoff check

* bug fix

* nit

* refactors cleanup with a ticker object
  • Loading branch information
yhassanzadeh13 authored May 30, 2022
1 parent 0ea9140 commit 06b5ba4
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cover.out
prof.out
go-floodsub.test

.idea/
107 changes: 107 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package pubsub

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
MaxBackoffJitterCoff = 100
MaxBackoffAttempts = 4
)

type backoffHistory struct {
duration time.Duration
lastTried time.Time
attempts int
}

type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
maxAttempts int // maximum backoff attempts prior to ejection
}

func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
maxAttempts: maxAttempts,
info: make(map[peer.ID]*backoffHistory),
}

rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)

return b
}

func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()

h, ok := b.info[id]
switch {
case !ok || time.Since(h.lastTried) > TimeToLive:
// first request goes immediately.
h = &backoffHistory{
duration: time.Duration(0),
attempts: 0,
}
case h.attempts >= b.maxAttempts:
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)

case h.duration < MinBackoffDelay:
h.duration = MinBackoffDelay

case h.duration < MaxBackoffDelay:
jitter := rand.Intn(MaxBackoffJitterCoff)
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
if h.duration > MaxBackoffDelay || h.duration < 0 {
h.duration = MaxBackoffDelay
}
}

h.attempts += 1
h.lastTried = time.Now()
b.info[id] = h
return h.duration, nil
}

func (b *backoff) cleanup() {
b.mu.Lock()
defer b.mu.Unlock()

for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
}

func (b *backoff) cleanupLoop(ctx context.Context) {
ticker := time.NewTicker(b.ci)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return // pubsub shutting down
case <-ticker.C:
b.cleanup()
}
}
}
122 changes: 122 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package pubsub

import (
"context"
"fmt"
"math"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)

func TestBackoff_Update(t *testing.T) {
id1 := peer.ID("peer-1")
id2 := peer.ID("peer-2")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

size := 10
cleanupInterval := 5 * time.Second
maxBackoffAttempts := 10

b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts)

if len(b.info) > 0 {
t.Fatal("non-empty info map for backoff")
}

if d, err := b.updateAndGet(id1); d != time.Duration(0) || err != nil {
t.Fatalf("invalid initialization: %v, \t, %s", d, err)
}
if d, err := b.updateAndGet(id2); d != time.Duration(0) || err != nil {
t.Fatalf("invalid initialization: %v, \t, %s", d, err)
}

for i := 0; i < maxBackoffAttempts-1; i++ {
got, err := b.updateAndGet(id1)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}

expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) *
float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond))
if expected > MaxBackoffDelay {
expected = MaxBackoffDelay
}

if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual.
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got)
}
}

// trying once more beyond the threshold, hence expecting exceeding threshold
if _, err := b.updateAndGet(id1); err == nil {
t.Fatalf("expected an error for going beyond threshold but got nil")
}

got, err := b.updateAndGet(id2)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
if got != MinBackoffDelay {
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got)
}

// sets last tried of id2 to long ago that it resets back upon next try.
// update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold.
b.info[id2].lastTried = time.Now().Add(-TimeToLive)
got, err = b.updateAndGet(id2)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
if got != time.Duration(0) {
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got)
}

if len(b.info) != 2 {
t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info))
}

}

func TestBackoff_Clean(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

size := 10
cleanupInterval := 2 * time.Second
maxBackoffAttempts := 100 // setting attempts to a high number hence testing cleanup logic.
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts)

for i := 0; i < size; i++ {
id := peer.ID(fmt.Sprintf("peer-%d", i))
_, err := b.updateAndGet(id)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry
}

if len(b.info) != size {
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info))
}

// waits for a cleanup loop to kick-in
time.Sleep(2 * cleanupInterval)

// next update should trigger cleanup
got, err := b.updateAndGet(peer.ID("some-new-peer"))
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
if got != time.Duration(0) {
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got)
}

// except "some-new-peer" every other records must be cleaned up
if len(b.info) != 1 {
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info))
}
}
10 changes: 10 additions & 0 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"io"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -121,6 +122,15 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
select {
case <-time.After(backoff):
p.handleNewPeer(ctx, pid, outgoing)
case <-ctx.Done():
return
}
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
pid := s.Conn().RemotePeer()
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
Expand Down
11 changes: 10 additions & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ type PubSub struct {
peerDeadPrioLk sync.RWMutex
peerDeadMx sync.Mutex
peerDeadPend map[peer.ID]struct{}
// backoff for retrying new connections to dead peers
deadPeerBackoff *backoff

// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}
Expand Down Expand Up @@ -255,6 +257,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
newPeerError: make(chan peer.ID),
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
Expand Down Expand Up @@ -694,12 +697,18 @@ func (p *PubSub) handleDeadPeers() {
close(ch)

if p.host.Network().Connectedness(pid) == network.Connected {
backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid)
if err != nil {
log.Debug(err)
continue
}

// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(p.ctx, pid, messages)
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, messages)
p.peers[pid] = messages
continue
}
Expand Down

0 comments on commit 06b5ba4

Please sign in to comment.