Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Add Backoff Cache Discovery #26

Merged
merged 14 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package discovery

import (
"math"
"math/rand"
"time"
)

type BackoffFactory func() BackoffStrategy

type BackoffStrategy interface {
Delay() time.Duration
Reset()
}

// Jitter implementations taken roughly from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

// Jitter must return a duration between min and max. Min must be lower than, or equal to, max.
type Jitter func(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

func FullJitter(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if duration <= min {
return min
}

normalizedDur := boundedDuration(duration, min, max) - min

return boundedDuration(time.Duration(rng.Int63n(int64(normalizedDur)))+min, min, max)
}

func NoJitter(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration {
return boundedDuration(duration, min, max)
}

type randomizedBackoff struct {
min time.Duration
max time.Duration
rng *rand.Rand
}

func (b *randomizedBackoff) BoundedDelay(duration time.Duration) time.Duration {
return boundedDuration(duration, b.min, b.max)
}

func boundedDuration(d time.Duration, min time.Duration, max time.Duration) time.Duration {
if d < min {
return min
}
if d > max {
return max
}
return d
}

type attemptBackoff struct {
attempt int
jitter Jitter
randomizedBackoff
}

func (b *attemptBackoff) Reset() {
b.attempt = 0
}

func NewFixedBackoff(delay time.Duration) BackoffFactory {
return func() BackoffStrategy {
return &fixedBackoff{delay: delay}
}
}

type fixedBackoff struct {
delay time.Duration
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

func (b *fixedBackoff) Delay() time.Duration {
return b.delay
}

func (b *fixedBackoff) Reset() {}

func NewPolynomialBackoff(min, max time.Duration, jitter Jitter,
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &polynomialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
poly: polyCoefs,
}
}
}

type polynomialBackoff struct {
attemptBackoff
timeUnits time.Duration
poly []float64
}

func (b *polynomialBackoff) Delay() time.Duration {
polySum := b.poly[0]
exp := 1
attempt := b.attempt
b.attempt++

for _, c := range b.poly[1:] {
exp *= attempt
polySum += float64(exp) * c
}
return b.jitter(time.Duration(float64(b.timeUnits)*polySum), b.min, b.max, b.rng)
}

func NewExponentialBackoff(min, max time.Duration, jitter Jitter,
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &exponentialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
base: base,
offset: offset,
}
}
}

type exponentialBackoff struct {
attemptBackoff
timeUnits time.Duration
base float64
offset time.Duration
}

func (b *exponentialBackoff) Delay() time.Duration {
attempt := b.attempt
b.attempt++
return b.jitter(
time.Duration(math.Pow(b.base, float64(attempt))*float64(b.timeUnits))+b.offset, b.min, b.max, b.rng)
}

func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return func() BackoffStrategy {
return &exponentialDecorrelatedJitter{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
base: base,
}
}
}

type exponentialDecorrelatedJitter struct {
randomizedBackoff
base float64
lastDelay time.Duration
}

func (b *exponentialDecorrelatedJitter) Delay() time.Duration {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if b.lastDelay < b.min {
b.lastDelay = b.min
return b.lastDelay
}

nextMax := int64(float64(b.lastDelay) * b.base)
b.lastDelay = boundedDuration(time.Duration(b.rng.Int63n(nextMax-int64(b.min)))+b.min, b.min, b.max)
return b.lastDelay
}

func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 }
125 changes: 125 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package discovery

import (
"math/rand"
"testing"
"time"
)

func checkDelay(bkf BackoffStrategy, expected time.Duration, t *testing.T) {
t.Helper()
if calculated := bkf.Delay(); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}

func TestFixedBackoff(t *testing.T) {
startDelay := time.Second
delay := startDelay

bkf := NewFixedBackoff(delay)
delay *= 2
b1 := bkf()
delay *= 2
b2 := bkf()

if b1.Delay() != startDelay || b2.Delay() != startDelay {
t.Fatal("incorrect delay time")
}

if b1.Delay() != startDelay {
t.Fatal("backoff is stateful")
}

if b1.Reset(); b1.Delay() != startDelay {
t.Fatalf("Reset does something")
}
}

func TestPolynomialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Second || b2.Delay() != time.Second {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*5500, t)
checkDelay(b1, time.Millisecond*16500, t)
checkDelay(b1, time.Millisecond*33000, t)
checkDelay(b2, time.Millisecond*5500, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*5500, t)
}

func TestExponentialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Millisecond*650 || b2.Delay() != time.Millisecond*650 {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*1100, t)
checkDelay(b1, time.Millisecond*1850, t)
checkDelay(b1, time.Millisecond*2975, t)
checkDelay(b1, time.Microsecond*4662500, t)
checkDelay(b1, time.Second*7, t)
checkDelay(b2, time.Millisecond*1100, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*1100, t)
}

func minMaxJitterTest(jitter Jitter, t *testing.T) {
rng := rand.New(rand.NewSource(0))
if jitter(time.Nanosecond, time.Hour*10, time.Hour*20, rng) < time.Hour*10 {
t.Fatal("Min not working")
}
if jitter(time.Hour, time.Nanosecond, time.Nanosecond*10, rng) > time.Nanosecond*10 {
t.Fatal("Max not working")
}
}

func TestNoJitter(t *testing.T) {
minMaxJitterTest(NoJitter, t)
for i := 0; i < 10; i++ {
expected := time.Second * time.Duration(i)
if calculated := NoJitter(expected, time.Duration(0), time.Second*100, nil); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}
}

func TestFullJitter(t *testing.T) {
rng := rand.New(rand.NewSource(0))
minMaxJitterTest(FullJitter, t)
const numBuckets = 51
const multiplier = 10
const threshold = 20

histogram := make([]int, numBuckets)

for i := 0; i < (numBuckets-1)*multiplier; i++ {
started := time.Nanosecond * 50
calculated := FullJitter(started, 0, 100, rng)
histogram[calculated]++
}

for _, count := range histogram {
if count > threshold {
t.Fatal("jitter is not close to evenly spread")
}
}

if histogram[numBuckets-1] > 0 {
t.Fatal("jitter increased overall time")
}
}
Loading