Skip to content

Commit

Permalink
consolidates update and maximum backoff check
Browse files Browse the repository at this point in the history
  • Loading branch information
yhassanzadeh13 committed May 27, 2022
1 parent 6ebc292 commit 8b64966
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 44 deletions.
21 changes: 6 additions & 15 deletions backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -48,7 +49,7 @@ func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Dur
return b
}

func (b *backoff) updateAndGet(id peer.ID) time.Duration {
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -60,6 +61,8 @@ func (b *backoff) updateAndGet(id peer.ID) time.Duration {
duration: time.Duration(0),
attempts: 0,
}
case h.attempts > b.maxAttempts:
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)

case h.duration < MinBackoffDelay:
h.duration = MinBackoffDelay
Expand All @@ -72,22 +75,10 @@ func (b *backoff) updateAndGet(id peer.ID) time.Duration {
}
}

h.lastTried = time.Now()
h.attempts += 1

h.lastTried = time.Now()
b.info[id] = h
return h.duration
}

func (b *backoff) peerExceededBackoffThreshold(id peer.ID) bool {
b.mu.Lock()
defer b.mu.Unlock()

h, ok := b.info[id]
if !ok {
return false // no record of this peer is still there, hence fine.
}
return h.attempts > b.maxAttempts
return h.duration, nil
}

func (b *backoff) cleanup() {
Expand Down
49 changes: 27 additions & 22 deletions backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@ func TestBackoff_Update(t *testing.T) {
t.Fatal("non-empty info map for backoff")
}

if d := b.updateAndGet(id1); d != time.Duration(0) {
t.Fatalf("invalid initialization: %v", d)
if d, err := b.updateAndGet(id1); d != time.Duration(0) || err != nil {
t.Fatalf("invalid initialization: %v, \t, %s", d, err)
}
if d := b.updateAndGet(id2); d != time.Duration(0) {
t.Fatalf("invalid initialization: %v", d)
if d, err := b.updateAndGet(id2); d != time.Duration(0) || err != nil {
t.Fatalf("invalid initialization: %v, \t, %s", d, err)
}

for i := 0; i < maxBackoffAttempts-1; i++ {
got := b.updateAndGet(id1)
got, err := b.updateAndGet(id1)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}

expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) *
float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond))
Expand All @@ -46,36 +49,32 @@ func TestBackoff_Update(t *testing.T) {
if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual.
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got)
}

// update attempts on id1 are below threshold, hence peer should never go beyond backoff attempt threshold
if b.peerExceededBackoffThreshold(id1) {
t.Fatalf("invalid exceeding threshold status")
}
}

// trying once more beyond the threshold, hence expecting exceeding threshold
b.updateAndGet(id1)
if !b.peerExceededBackoffThreshold(id1) {
t.Fatal("update beyond max attempts does not reflect threshold")
if _, err := b.updateAndGet(id1); err != nil {
t.Fatalf("invalid exceeding threshold status: %s", err)
}

got := b.updateAndGet(id2)
got, err := b.updateAndGet(id2)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
if got != MinBackoffDelay {
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got)
}

// sets last tried of id2 to long ago that it resets back upon next try.
// update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold.
b.info[id2].lastTried = time.Now().Add(-TimeToLive)
got = b.updateAndGet(id2)
got, err = b.updateAndGet(id2)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
if got != time.Duration(0) {
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got)
}

// update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold
if b.peerExceededBackoffThreshold(id2) {
t.Fatalf("invalid exceeding threshold status")
}

if len(b.info) != 2 {
t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info))
}
Expand All @@ -93,7 +92,10 @@ func TestBackoff_Clean(t *testing.T) {

for i := 0; i < size; i++ {
id := peer.ID(fmt.Sprintf("peer-%d", i))
b.updateAndGet(id)
_, err := b.updateAndGet(id)
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry
}

Expand All @@ -105,7 +107,10 @@ func TestBackoff_Clean(t *testing.T) {
time.Sleep(2 * cleanupInterval)

// next update should trigger cleanup
got := b.updateAndGet(peer.ID("some-new-peer"))
got, err := b.updateAndGet(peer.ID("some-new-peer"))
if err != nil {
t.Fatalf("unexpected error post update: %s", err)
}
if got != time.Duration(0) {
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got)
}
Expand Down
6 changes: 2 additions & 4 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
}

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
delay := p.deadPeerBackoff.updateAndGet(pid)

func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
select {
case <-time.After(delay):
case <-time.After(backoff):
p.handleNewPeer(ctx, pid, outgoing)
case <-ctx.Done():
return
Expand Down
10 changes: 7 additions & 3 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,15 +683,19 @@ func (p *PubSub) handleDeadPeers() {

close(ch)

if p.host.Network().Connectedness(pid) == network.Connected &&
!p.deadPeerBackoff.peerExceededBackoffThreshold(pid) {
if p.host.Network().Connectedness(pid) == network.Connected {
backoffDelay, err := p.deadPeerBackoff.updateAndGet(pid)
if err != nil {
log.Debug(err)
continue
}

// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeerWithBackoff(p.ctx, pid, messages)
go p.handleNewPeerWithBackoff(p.ctx, pid, backoffDelay, messages)
p.peers[pid] = messages
continue
}
Expand Down

0 comments on commit 8b64966

Please sign in to comment.