Skip to content

Commit

Permalink
feat: Add backoff to flush op (#13140)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Jul 2, 2024
1 parent 1b7071c commit 2ac9185
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 59 deletions.
66 changes: 41 additions & 25 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -944,10 +944,6 @@ alertmanager_client:
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -965,7 +961,11 @@ alertmanager_client:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -1219,10 +1219,6 @@ evaluation:
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -1240,7 +1236,11 @@ evaluation:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -1442,7 +1442,23 @@ lifecycler:
# CLI flag: -ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]
# The timeout before a flush is cancelled.
flush_op_backoff:
# Minimum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-min-period
[min_period: <duration> | default = 10s]

# Maximum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-max-period
[max_period: <duration> | default = 1m]

# Maximum retries for failed flushes.
# CLI flag: -ingester.flush-op-backoff-retries
[max_retries: <int> | default = 10]

# The timeout for an individual flush. Will be retried up to
# `flush-op-backoff-retries` times.
# CLI flag: -ingester.flush-op-timeout
[flush_op_timeout: <duration> | default = 10m]

Expand Down Expand Up @@ -3330,10 +3346,6 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et
# Override the default cipher suite list (separated by commas). Allowed values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -3351,7 +3363,11 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -3538,10 +3554,6 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# Override the default cipher suite list (separated by commas). Allowed values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -3559,7 +3571,11 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -3672,10 +3688,6 @@ backoff_config:
# Override the default cipher suite list (separated by commas). Allowed values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -3693,7 +3705,11 @@ backoff_config:
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down Expand Up @@ -3754,10 +3770,6 @@ The TLS configuration.
# Override the default cipher suite list (separated by commas). Allowed values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
Expand All @@ -3775,7 +3787,11 @@ The TLS configuration.
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
Expand Down
37 changes: 28 additions & 9 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/net/context"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -126,8 +127,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}

func (i *Ingester) flushLoop(j int) {
l := log.With(util_log.Logger, "loop", j)
defer func() {
level.Debug(util_log.Logger).Log("msg", "Ingester.flushLoop() exited")
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

Expand All @@ -138,9 +140,10 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
m := util_log.WithUserID(op.userID, l)
err := i.flushOp(m, op)
if err != nil {
level.Error(util_log.WithUserID(op.userID, util_log.Logger)).Log("msg", "failed to flush", "err", err)
level.Error(m).Log("msg", "failed to flush", "err", err)
}

// If we're exiting & we failed to flush, put the failed operation
Expand All @@ -152,7 +155,23 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
if err == nil {
break
}
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}
return b.Err()
}

func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
Expand All @@ -166,9 +185,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
lbs := labels.String()
level.Info(util_log.Logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
ctx = user.InjectOrgID(ctx, userID)
ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancelFunc()
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
if err != nil {
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)
Expand Down
66 changes: 66 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"errors"
"fmt"
"os"
"sort"
Expand Down Expand Up @@ -100,6 +101,67 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}

func Test_FlushOp(t *testing.T) {
t.Run("no error", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

_, ing := newTestStore(t, cfg, nil)

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}))
})

t.Run("max retries exceeded", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
return errors.New("failed to write chunks")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "terminated after 1 retries")
})
}

func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
Expand Down Expand Up @@ -295,6 +357,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = 15 * time.Second
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.MaxChunkIdle = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
Expand Down
17 changes: 14 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
Expand Down Expand Up @@ -80,6 +81,7 @@ type Config struct {

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
Expand Down Expand Up @@ -123,7 +125,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 0, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
Expand Down Expand Up @@ -151,8 +156,14 @@ func (cfg *Config) Validate() error {
return err
}

if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled {
return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers")
if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
}
if cfg.FlushOpBackoff.MaxRetries <= 0 {
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
}
if cfg.FlushOpTimeout <= 0 {
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
}

if cfg.IndexShards <= 0 {
Expand Down
Loading

0 comments on commit 2ac9185

Please sign in to comment.