Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common.Backoff now implements jitter instead of sleeping for a fixed amount of time #10177

Merged
merged 2 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Enforce validation for the Central Management access token. {issue}9621[9621]
- Fix config appender registration. {pull}9873[9873]
- Gracefully handle TLS options when enrolling a Beat. {issue}9129[9129]
- The backing off now implements jitter to better distribute the load. {issue}10172[10172]

*Auditbeat*

Expand Down
5 changes: 3 additions & 2 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/logp"
)

Expand All @@ -44,7 +45,7 @@ type Reader struct {
config Config
done chan struct{}
logger *logp.Logger
backoff *common.Backoff
backoff backoff.Backoff
}

// New creates a new journal reader and moves the FP to the configured position.
Expand Down Expand Up @@ -98,7 +99,7 @@ func newReader(logger *logp.Logger, done chan struct{}, c Config, journal *sdjou
config: c,
done: done,
logger: logger,
backoff: common.NewBackoff(done, c.Backoff, c.MaxBackoff),
backoff: backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff),
}
r.seek(state.Cursor)

Expand Down
34 changes: 34 additions & 0 deletions libbeat/common/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backoff

// Backoff defines the interface for backoff strategies.
type Backoff interface {
Wait() bool
Reset()
}

// WaitOnError is a convenience method, if an error is received it will block, if not errors is
// received, the backoff will be resetted.
func WaitOnError(b Backoff, err error) bool {
if err == nil {
b.Reset()
return true
}
return b.Wait()
}
83 changes: 83 additions & 0 deletions libbeat/common/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backoff

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type factory func(<-chan struct{}) Backoff

func TestBackoff(t *testing.T) {
t.Run("test close channel", testCloseChannel)
t.Run("test unblock after some time", testUnblockAfterInit)
}

func testCloseChannel(t *testing.T) {
init := 2 * time.Second
max := 5 * time.Minute

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
b := f(c)
close(c)
assert.False(t, b.Wait())
})
}
}

func testUnblockAfterInit(t *testing.T) {
init := 1 * time.Second
max := 5 * time.Minute

tests := map[string]factory{
"ExpBackoff": func(done <-chan struct{}) Backoff {
return NewExpBackoff(done, init, max)
},
"EqualJitterBackoff": func(done <-chan struct{}) Backoff {
return NewEqualJitterBackoff(done, init, max)
},
}

for name, f := range tests {
t.Run(name, func(t *testing.T) {
c := make(chan struct{})
defer close(c)

b := f(c)

startedAt := time.Now()
assert.True(t, WaitOnError(b, errors.New("bad bad")))
assert.True(t, time.Now().Sub(startedAt) >= init)
})
}
}
73 changes: 73 additions & 0 deletions libbeat/common/backoff/equal_jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backoff

import (
"math/rand"
"time"
)

// EqualJitterBackoff implements an equal jitter strategy, meaning the wait time will consist of two parts,
// the first will be exponential and the other half will be random and will provide the jitter
// necessary to distribute the wait on remote endpoint.
type EqualJitterBackoff struct {
duration time.Duration
done <-chan struct{}

init time.Duration
max time.Duration

last time.Time
}

// NewEqualJitterBackoff returns a new EqualJitter object.
func NewEqualJitterBackoff(done <-chan struct{}, init, max time.Duration) Backoff {
return &EqualJitterBackoff{
duration: init * 2, // Allow to sleep at least the init period on the first wait.
done: done,
init: init,
max: max,
}
}

// Reset resets the duration of the backoff.
func (b *EqualJitterBackoff) Reset() {
// Allow to sleep at least the init period on the first wait.
b.duration = b.init * 2
}

// Wait block until either the timer is completed or channel is done.
func (b *EqualJitterBackoff) Wait() bool {
// Make sure we have always some minimal back off and jitter.
temp := int64(b.duration / 2)
backoff := time.Duration(temp + rand.Int63n(temp))

// increase duration for next wait.
b.duration *= 2
if b.duration > b.max {
b.duration = b.max
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: By moving this block before the computation of temp you don't need the init*2 on reset. In the end it doesn't matter :)


select {
case <-b.done:
return false
case <-time.After(backoff):
b.last = time.Now()
return true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.

package common
package backoff

import "time"
import (
"time"
)

// A Backoff waits on errors with exponential backoff (limited by maximum
// backoff). Resetting Backoff will reset the next sleep timer to the initial
// backoff duration.
type Backoff struct {
// ExpBackoff exponential backoff, will wait an initial time and exponentialy
// increases the wait time up to a predefined maximun. Resetting Backoff will reset the next sleep
// timer to the initial backoff duration.
type ExpBackoff struct {
duration time.Duration
done <-chan struct{}

Expand All @@ -32,20 +34,23 @@ type Backoff struct {
last time.Time
}

func NewBackoff(done <-chan struct{}, init, max time.Duration) *Backoff {
return &Backoff{
// NewExpBackoff returns a new exponential backoff.
func NewExpBackoff(done <-chan struct{}, init, max time.Duration) Backoff {
return &ExpBackoff{
duration: init,
done: done,
init: init,
max: max,
}
}

func (b *Backoff) Reset() {
// Reset resets the duration of the backoff.
func (b *ExpBackoff) Reset() {
b.duration = b.init
}

func (b *Backoff) Wait() bool {
// Wait block until either the timer is completed or channel is done.
func (b *ExpBackoff) Wait() bool {
backoff := b.duration
b.duration *= 2
if b.duration > b.max {
Expand All @@ -60,24 +65,3 @@ func (b *Backoff) Wait() bool {
return true
}
}

func (b *Backoff) WaitOnError(err error) bool {
if err == nil {
b.Reset()
return true
}
return b.Wait()
}

func (b *Backoff) TryWaitOnError(failTS time.Time, err error) bool {
if err == nil {
b.Reset()
return true
}

if failTS.Before(b.last) {
return true
}

return b.Wait()
}
10 changes: 5 additions & 5 deletions libbeat/outputs/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/testing"
)
Expand All @@ -30,13 +30,13 @@ type backoffClient struct {
client NetworkClient

done chan struct{}
backoff *common.Backoff
backoff backoff.Backoff
}

// WithBackoff wraps a NetworkClient, adding exponential backoff support to a network client if connection/publishing failed.
func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient {
done := make(chan struct{})
backoff := common.NewBackoff(done, init, max)
backoff := backoff.NewEqualJitterBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
Expand All @@ -46,7 +46,7 @@ func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient {

func (b *backoffClient) Connect() error {
err := b.client.Connect()
b.backoff.WaitOnError(err)
backoff.WaitOnError(b.backoff, err)
return err
}

Expand All @@ -61,7 +61,7 @@ func (b *backoffClient) Publish(batch publisher.Batch) error {
if err != nil {
b.client.Close()
}
b.backoff.WaitOnError(err)
backoff.WaitOnError(b.backoff, err)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/redis/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/garyburd/redigo/redis"

"github.com/elastic/beats/libbeat/common"
b "github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/publisher"
)

Expand All @@ -32,7 +32,7 @@ type backoffClient struct {
reason failReason

done chan struct{}
backoff *common.Backoff
backoff b.Backoff
}

// failReason is used to track the cause of an error.
Expand All @@ -51,7 +51,7 @@ const (

func newBackoffClient(client *client, init, max time.Duration) *backoffClient {
done := make(chan struct{})
backoff := common.NewBackoff(done, init, max)
backoff := b.NewEqualJitterBackoff(done, init, max)
return &backoffClient{
client: client,
done: done,
Expand Down
4 changes: 2 additions & 2 deletions x-pack/functionbeat/licenser/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/gofrs/uuid"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/backoff"
"github.com/elastic/beats/libbeat/logp"
)

Expand Down Expand Up @@ -253,7 +253,7 @@ func (m *Manager) worker() {
}

func (m *Manager) update() {
backoff := common.NewBackoff(m.done, initBackoff, maxBackoff)
backoff := backoff.NewEqualJitterBackoff(m.done, initBackoff, maxBackoff)
startedAt := time.Now()
for {
select {
Expand Down