-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds exponential backoff to re-spawing new streams for supposedly dead peers #483
Changes from 6 commits
c3a6760
5e8ec29
7c58f7a
42e310b
90cdd55
b3f58bc
a9f4edf
a77d435
6e4b2f8
2761b98
6401d8b
4c94e5f
e260291
c74ae78
7f815f0
6ebc292
8b64966
c00510e
e9d42fa
eede9ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
cover.out | ||
prof.out | ||
go-floodsub.test | ||
|
||
.idea/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package pubsub | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
const ( | ||
MinBackoffDelay = 100 * time.Millisecond | ||
MaxBackoffDelay = 10 * time.Second | ||
TimeToLive = 10 * time.Minute | ||
BackoffMultiplier = 2 | ||
) | ||
|
||
type backoffHistory struct { | ||
duration time.Duration | ||
lastTried time.Time | ||
} | ||
|
||
type backoff struct { | ||
mu sync.Mutex | ||
info map[peer.ID]*backoffHistory | ||
ct int // size threshold that kicks off the cleaner | ||
} | ||
|
||
func newBackoff(sizeThreshold int) *backoff { | ||
return &backoff{ | ||
mu: sync.Mutex{}, | ||
ct: sizeThreshold, | ||
info: make(map[peer.ID]*backoffHistory), | ||
} | ||
} | ||
|
||
func (b *backoff) updateAndGet(id peer.ID) time.Duration { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
h, ok := b.info[id] | ||
if !ok || time.Since(h.lastTried) > TimeToLive { | ||
// first request goes immediately. | ||
h = &backoffHistory{ | ||
duration: time.Duration(0), | ||
} | ||
} else if h.duration < MinBackoffDelay { | ||
h.duration = MinBackoffDelay | ||
} else if h.duration < MaxBackoffDelay { | ||
h.duration = time.Duration(BackoffMultiplier * h.duration) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add some jitter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if h.duration > MaxBackoffDelay || h.duration < 0 { | ||
h.duration = MaxBackoffDelay | ||
} | ||
} | ||
|
||
h.lastTried = time.Now() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's get the time after checking the max attempts, will avoid the gettimeofday call in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please read my reply to the below comment as this part has got changed. |
||
b.info[id] = h | ||
|
||
if len(b.info) > b.ct { | ||
b.cleanup() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: if we have a lot of peers (more than For example, if we maintain an additional heap datastructure which is sorted by the expiration time, then at every call to the function we can just pop all the expired entries from the heap one by one and remove them from the map. This would require us to track the explicit expiration time in each Will leave to @vyzo to decide whether this is necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets see if it is actually a problem in practice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it might be problematic, lets run a background goroutine that does it periodically (say once a minute). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in a9f4edf |
||
|
||
return h.duration | ||
} | ||
|
||
func (b *backoff) cleanup() { | ||
for id, h := range b.info { | ||
if time.Since(h.lastTried) > TimeToLive { | ||
delete(b.info, id) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package pubsub | ||
|
||
import ( | ||
"fmt" | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
func TestBackoff_Update(t *testing.T){ | ||
id1 := peer.ID("peer-1") | ||
id2 := peer.ID("peer-2") | ||
b := newBackoff(10) | ||
|
||
if len(b.info) > 0 { | ||
t.Fatal("non-empty info map for backoff") | ||
} | ||
|
||
if d := b.updateAndGet(id1); d != time.Duration(0) { | ||
t.Fatalf("invalid initialization: %v", d) | ||
} | ||
if d := b.updateAndGet(id2); d != time.Duration(0) { | ||
t.Fatalf("invalid initialization: %v", d) | ||
} | ||
|
||
for i := 0; i < 10; i++{ | ||
got := b.updateAndGet(id1) | ||
|
||
expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) * float64(MinBackoffDelay)) | ||
if expected > MaxBackoffDelay { | ||
expected = MaxBackoffDelay | ||
} | ||
|
||
if expected != got { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got) | ||
} | ||
} | ||
|
||
got := b.updateAndGet(id2) | ||
if got != MinBackoffDelay { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got) | ||
} | ||
|
||
// sets last tried of id2 to long ago that it resets back upon next try. | ||
b.info[id2].lastTried = time.Now().Add(-TimeToLive) | ||
got = b.updateAndGet(id2) | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
if len(b.info) != 2 { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 2, len(b.info)) | ||
} | ||
} | ||
|
||
func TestBackoff_Clean(t *testing.T){ | ||
size := 10 | ||
b := newBackoff(size) | ||
|
||
for i := 0; i < size; i++{ | ||
id := peer.ID(fmt.Sprintf("peer-%d", i)) | ||
b.updateAndGet(id) | ||
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry | ||
} | ||
|
||
if len(b.info) != size { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info)) | ||
} | ||
|
||
// next update should trigger cleanup | ||
got := b.updateAndGet(peer.ID("some-new-peer")) | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
// except "some-new-peer" every other records must be cleaned up | ||
if len(b.info) != 1 { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"bufio" | ||
"context" | ||
"io" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
|
@@ -121,6 +122,16 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan | |
} | ||
} | ||
|
||
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) { | ||
delay := p.deadPeerBackoff.updateAndGet(pid) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to add a failure more if we have backed off too much and simply give up; say we try up to 10 times and then How does that sound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe 10 is even too much, 3-4 attempts should be enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
select { | ||
case <-time.After(delay): | ||
p.handleNewPeer(ctx, pid, outgoing) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { | ||
pid := s.Conn().RemotePeer() | ||
r := protoio.NewDelimitedReader(s, p.maxMessageSize) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's write this if/else sequence with a switch, will be nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7f815f0