From 9c64861e01417b80f9b34a20319e580e0ee73c6f Mon Sep 17 00:00:00 2001 From: mihir Date: Fri, 16 Aug 2024 18:29:59 +0530 Subject: [PATCH 1/2] chore: limit max retry interval for transformer --- processor/transformer/transformer.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index d46fdc8611..5514dace67 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -172,6 +172,7 @@ type handle struct { maxRetry config.ValueLoader[int] failOnUserTransformTimeout config.ValueLoader[bool] failOnError config.ValueLoader[bool] + maxRetryBackoffInterval config.ValueLoader[time.Duration] destTransformationURL string userTransformationURL string @@ -203,6 +204,8 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op trans.config.failOnUserTransformTimeout = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnUserTransformTimeout") trans.config.failOnError = conf.GetReloadableBoolVar(false, "Processor.Transformer.failOnError") + trans.config.maxRetryBackoffInterval = conf.GetReloadableDurationVar(30, time.Second, "Processor.Transformer.maxRetryBackoffInterval") + trans.guardConcurrency = make(chan struct{}, trans.config.maxConcurrency) if trans.client == nil { @@ -366,6 +369,7 @@ func (trans *handle) request(ctx context.Context, url, stage string, data []Tran // endless retry if transformer-control plane connection is down endlessBackoff := backoff.NewExponentialBackOff() endlessBackoff.MaxElapsedTime = 0 // no max time -> ends only when no error + endlessBackoff.MaxInterval = trans.config.maxRetryBackoffInterval.Load() // endless backoff loop, only nil error or panics inside _ = backoff.RetryNotify( @@ -439,6 +443,9 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri resp *http.Response respData []byte ) + retryStrategy := backoff.NewExponentialBackOff() + // MaxInterval caps the RetryInterval + retryStrategy.MaxInterval = trans.config.maxRetryBackoffInterval.Load() err := backoff.RetryNotify( func() error { @@ -473,7 +480,7 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri respData, reqErr = io.ReadAll(resp.Body) return reqErr }, - backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(trans.config.maxRetry.Load())), + backoff.WithMaxRetries(retryStrategy, uint64(trans.config.maxRetry.Load())), func(err error, t time.Duration) { retryCount++ trans.logger.Warnn( From 04fdf1abf4cdfc7bd1b1bc37c88a10e3a5a387f9 Mon Sep 17 00:00:00 2001 From: mihir Date: Fri, 16 Aug 2024 18:50:20 +0530 Subject: [PATCH 2/2] fix tests --- processor/transformer/transformer_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/processor/transformer/transformer_test.go b/processor/transformer/transformer_test.go index e952895f47..86ca09247e 100644 --- a/processor/transformer/transformer_test.go +++ b/processor/transformer/transformer_test.go @@ -160,6 +160,8 @@ func TestTransformer(t *testing.T) { tr.config.failOnUserTransformTimeout = config.SingleValueLoader(true) tr.config.failOnError = config.SingleValueLoader(true) + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) + tr.config.maxRetry = config.SingleValueLoader(1) tc := []struct { @@ -326,6 +328,7 @@ func TestTransformer(t *testing.T) { tr.config.maxRetry = config.SingleValueLoader(tc.retries) tr.config.failOnUserTransformTimeout = config.SingleValueLoader(tc.failOnUserTransformTimeout) tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) if tc.expectPanic { require.Panics(t, func() { @@ -386,6 +389,7 @@ func TestTransformer(t *testing.T) { tr.conf = config.Default tr.client = srv.Client() tr.config.maxRetry = config.SingleValueLoader(1) + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) tr.config.timeoutDuration = 1 * time.Second tr.config.failOnUserTransformTimeout = config.SingleValueLoader(false) tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) @@ -520,6 +524,7 @@ func TestTransformer(t *testing.T) { tr.config.failOnError = config.SingleValueLoader(tc.failOnError) tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) tr.config.timeoutDuration = 1 * time.Second + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) if tc.expectPanic { require.Panics(t, func() { @@ -620,6 +625,7 @@ func TestTransformer(t *testing.T) { tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType) tr.config.maxRetry = config.SingleValueLoader(1) tr.config.timeoutDuration = 1 * time.Second + tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second) if tc.expectPanic { require.Panics(t, func() {