Skip to content

Commit

Permalink
Add backoff to flush op
Browse files Browse the repository at this point in the history
This commit adds a configurable backoff to flush ops in the ingester.
This is to prevent situations where the store put operation fails fast
(i.e. 401 Unauthorized) and can cause ingesters to be rate limited.
  • Loading branch information
grobinson-grafana committed Jun 6, 2024
1 parent 21dd4af commit e661dcf
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 31 deletions.
18 changes: 17 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2752,7 +2752,23 @@ lifecycler:
# CLI flag: -ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]
# The timeout before a flush is cancelled.
flush_op_backoff:
# Minimum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-min-period
[min_period: <duration> | default = 10s]

# Maximum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-max-period
[max_period: <duration> | default = 1m]

# Maximum retries for failed flushes. Is canceled when
# `ingester.flush-op-timeout` is exceeded.
# CLI flag: -ingester.flush-op-backoff-retries
[max_retries: <int> | default = 10]

# The timeout before a flush is canceled.
# CLI flag: -ingester.flush-op-timeout
[flush_op_timeout: <duration> | default = 10m]

Expand Down
48 changes: 41 additions & 7 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package ingester

import (
"bytes"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -135,8 +138,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}

func (i *Ingester) flushLoop(j int) {
l := log.With(i.logger, "loop", j)
defer func() {
level.Debug(i.logger).Log("msg", "Ingester.flushLoop() exited")
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

Expand All @@ -147,9 +151,10 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
m := util_log.WithUserID(op.userID, l)
err := i.flushOp(m, op)
if err != nil {
level.Error(util_log.WithUserID(op.userID, i.logger)).Log("msg", "failed to flush", "err", err)
level.Error(m).Log("msg", "failed to flush", "err", err)
}

// If we're exiting & we failed to flush, put the failed operation
Expand All @@ -161,7 +166,39 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
// A flush is retried until either it is successful, the maximum number
// of retries is exceeded, or the timeout has expired. The context is
// used to cancel the backoff should the latter happen.
ctx, cancelFunc := context.WithTimeout(context.Background(), i.cfg.FlushOpTimeout)
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
if err == nil {
break
}
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}

if err := b.Err(); err != nil {
// If we got here then either the maximum number of retries have been
// exceeded or the timeout expired. We do not need to check ctx.Err()
// as it is checked in b.Err().
if errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("timed out after %s: %w", i.cfg.FlushOpTimeout, err)
}
return err
}

return nil
}

func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
ctx = user.InjectOrgID(ctx, userID)

instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
Expand All @@ -175,9 +212,6 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
lbs := labels.String()
level.Info(i.logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
if err != nil {
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)
Expand Down
99 changes: 99 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"errors"
"fmt"
"os"
"sort"
Expand Down Expand Up @@ -102,6 +103,100 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}

func Test_FlushOp(t *testing.T) {
t.Run("no error", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

_, ing := newTestStore(t, cfg, nil)

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}))
})

t.Run("max retries exceeded", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
return errors.New("failed to write chunks")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "terminated after 1 retries")
})

t.Run("timeout expired", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = 100 * time.Millisecond
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
time.Sleep(150 * time.Millisecond)
return errors.New("store is unavailable")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "timed out after 100ms: context deadline exceeded")
})
}

func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
Expand Down Expand Up @@ -297,6 +392,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = 15 * time.Second
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.MaxChunkIdle = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
Expand Down
16 changes: 15 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Config struct {

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
Expand Down Expand Up @@ -126,7 +128,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes. Is canceled when `ingester.flush-op-timeout` is exceeded.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is canceled.")
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
Expand Down Expand Up @@ -154,6 +159,15 @@ func (cfg *Config) Validate() error {
return err
}

if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
}
if cfg.FlushOpBackoff.MaxRetries <= 0 {
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
}
if cfg.FlushOpTimeout <= 0 {
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
}
if cfg.IndexShards <= 0 {
return fmt.Errorf("invalid ingester index shard factor: %d", cfg.IndexShards)
}
Expand Down
Loading

0 comments on commit e661dcf

Please sign in to comment.