Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Implement backoff for Kafka output (#17808) #18260

Merged
merged 3 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add keystore support for autodiscover static configurations. {pull]16306[16306]
- Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153]
- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958]
- Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
11 changes: 11 additions & 0 deletions libbeat/_meta/config/output-kafka.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package backoff

// Backoff defines the interface for backoff strategies.
type Backoff interface {
// Wait blocks for a duration of time governed by the backoff strategy.
Wait() bool

// Reset resets the backoff duration to an initial value governed by the backoff strategy.
Reset()
}

Expand Down
41 changes: 41 additions & 0 deletions libbeat/internal/testutil/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This file contains commonly-used utility functions for testing.

package testutil

import (
"flag"
"math/rand"
"testing"
"time"
)

var (
SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

func SeedPRNG(t *testing.T) {
seed := *SeedFlag
if seed == 0 {
seed = time.Now().UnixNano()
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
}
42 changes: 37 additions & 5 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package kafka
import (
"errors"
"fmt"
"math"
"math/rand"
"strings"
"time"

Expand All @@ -37,6 +39,11 @@ import (
"github.com/elastic/beats/v7/libbeat/outputs/codec"
)

type backoffConfig struct {
Init time.Duration `config:"init"`
Max time.Duration `config:"max"`
}

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Expand All @@ -55,6 +62,7 @@ type kafkaConfig struct {
BulkMaxSize int `config:"bulk_max_size"`
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
Backoff backoffConfig `config:"backoff"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Username string `config:"username"`
Expand Down Expand Up @@ -106,10 +114,14 @@ func defaultConfig() kafkaConfig {
CompressionLevel: 4,
Version: kafka.Version("1.0.0"),
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
Backoff: backoffConfig{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
}

Expand Down Expand Up @@ -225,7 +237,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
// TODO: k.Producer.Retry.Backoff = ?
k.Producer.Retry.BackoffFunc = makeBackoffFunc(config.Backoff)

// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
Expand Down Expand Up @@ -260,3 +272,23 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
}
return k, nil
}

// makeBackoffFunc returns a stateless implementation of exponential-backoff-with-jitter. It is conceptually
// equivalent to the stateful implementation used by other outputs, EqualJitterBackoff.
func makeBackoffFunc(cfg backoffConfig) func(retries, maxRetries int) time.Duration {
maxBackoffRetries := int(math.Ceil(math.Log2(float64(cfg.Max) / float64(cfg.Init))))

return func(retries, _ int) time.Duration {
// compute 'base' duration for exponential backoff
dur := cfg.Max
if retries < maxBackoffRetries {
dur = time.Duration(uint64(cfg.Init) * uint64(1<<retries))
}

// apply about equaly distributed jitter in second half of the interval, such that the wait
// time falls into the interval [dur/2, dur]
limit := int64(dur / 2)
jitter := rand.Int63n(limit + 1)
return time.Duration(limit + jitter)
}
}
34 changes: 34 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package kafka

import (
"fmt"
"math"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -97,3 +101,33 @@ func TestConfigInvalid(t *testing.T) {
})
}
}

func TestBackoffFunc(t *testing.T) {
testutil.SeedPRNG(t)
tests := map[int]backoffConfig{
15: {Init: 1 * time.Second, Max: 60 * time.Second},
7: {Init: 2 * time.Second, Max: 20 * time.Second},
4: {Init: 5 * time.Second, Max: 7 * time.Second},
}

for numRetries, backoffCfg := range tests {
t.Run(fmt.Sprintf("%v_retries", numRetries), func(t *testing.T) {
backoffFn := makeBackoffFunc(backoffCfg)

prevBackoff := backoffCfg.Init
for retries := 1; retries <= numRetries; retries++ {
backoff := prevBackoff * 2

expectedBackoff := math.Min(float64(backoff), float64(backoffCfg.Max))
actualBackoff := backoffFn(retries, 50)

if !((expectedBackoff/2 <= float64(actualBackoff)) && (float64(actualBackoff) <= expectedBackoff)) {
t.Fatalf("backoff '%v' not in expected range [%v, %v] (retries: %v)", actualBackoff, expectedBackoff/2, expectedBackoff, retries)
}

prevBackoff = backoff
}

})
}
}
3 changes: 2 additions & 1 deletion libbeat/publisher/pipeline/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
Expand All @@ -43,7 +44,7 @@ func TestOutputReload(t *testing.T) {

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)
Expand Down
7 changes: 4 additions & 3 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
Expand All @@ -42,7 +43,7 @@ func TestMakeClientWorker(t *testing.T) {

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 300 + (i % 100) // between 300 and 399
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestReplaceClientWorker(t *testing.T) {

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

err := quick.Check(func(i uint) bool {
numBatches := 1000 + (i % 100) // between 1000 and 1099
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestReplaceClientWorker(t *testing.T) {
}

func TestMakeClientTracer(t *testing.T) {
seedPRNG(t)
testutil.SeedPRNG(t)

numBatches := 10
numEvents := atomic.MakeUint(0)
Expand Down
16 changes: 0 additions & 16 deletions libbeat/publisher/pipeline/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package pipeline

import (
"context"
"flag"
"math/rand"
"sync"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

var (
SeedFlag = flag.Int64("seed", 0, "Randomization seed")
)

type mockPublishFn func(publisher.Batch) error

func newMockClient(publishFn mockPublishFn) outputs.Client {
Expand Down Expand Up @@ -158,16 +152,6 @@ func randIntBetween(min, max int) int {
return rand.Intn(max-min) + min
}

func seedPRNG(t *testing.T) {
seed := *SeedFlag
if seed == 0 {
seed = time.Now().UnixNano()
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
}

func waitUntilTrue(duration time.Duration, fn func() bool) bool {
end := time.Now().Add(duration)
for time.Now().Before(end) {
Expand Down
11 changes: 11 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,17 @@ output.elasticsearch:
# until all events are published. The default is 3.
#max_retries: 3

# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s

# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
Expand Down
Loading