Skip to content

Commit

Permalink
Merge pull request #2124 from alexkats/neg-dt
Browse files Browse the repository at this point in the history
Introduce dynamic throttling for NEG requests
  • Loading branch information
k8s-ci-robot authored Jun 20, 2023
2 parents 92d5a4e + 4bb68d3 commit 80f2d99
Show file tree
Hide file tree
Showing 22 changed files with 1,413 additions and 172 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.ControllerMetrics,
ctx.L4Namer,
ctx.DefaultBackendSvcPort,
negtypes.NewAdapter(ctx.Cloud),
negtypes.NewAdapterWithRateLimitSpecs(ctx.Cloud, flags.F.GCERateLimit.Values()),
zoneGetter,
ctx.ClusterNamer,
flags.F.ResyncPeriod,
Expand Down
50 changes: 34 additions & 16 deletions pkg/neg/syncers/backoff.go → pkg/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers
package backoff

import (
"fmt"
Expand All @@ -26,17 +26,19 @@ import (

var ErrRetriesExceeded = fmt.Errorf("maximum retry exceeded")

// backoffHandler handles delays for back off retry
type backoffHandler interface {
// NextRetryDelay returns the delay for next retry or error if maximum number of retries exceeded.
NextRetryDelay() (time.Duration, error)
// ResetRetryDelay resets the retry delay
ResetRetryDelay()
// BackoffHandler handles delays for back off retry
type BackoffHandler interface {
// NextDelay returns the delay for next retry or error if maximum number of retries exceeded.
NextDelay() (time.Duration, error)
// ResetDelay resets the retry delay
ResetDelay()
// DecreaseDelay returns the decreased delay for next retry
DecreaseDelay() time.Duration
}

// exponentialBackOffHandler is a backoff handler that returns retry delays semi-exponentially with random jitter within boundary.
// exponentialBackOffHandler returns ErrRetriesExceeded when maximum number of retries has reached.
type exponentialBackOffHandler struct {
// exponentialBackoffHandler is a backoff handler that returns retry delays semi-exponentially with random jitter within boundary.
// exponentialBackoffHandler returns ErrRetriesExceeded when maximum number of retries has reached.
type exponentialBackoffHandler struct {
lock sync.Mutex
lastRetryDelay time.Duration
retryCount int
Expand All @@ -45,8 +47,8 @@ type exponentialBackOffHandler struct {
maxRetryDelay time.Duration
}

func NewExponentialBackendOffHandler(maxRetries int, minRetryDelay, maxRetryDelay time.Duration) *exponentialBackOffHandler {
return &exponentialBackOffHandler{
func NewExponentialBackoffHandler(maxRetries int, minRetryDelay, maxRetryDelay time.Duration) BackoffHandler {
return &exponentialBackoffHandler{
lastRetryDelay: time.Duration(0),
retryCount: 0,
maxRetries: maxRetries,
Expand All @@ -55,8 +57,8 @@ func NewExponentialBackendOffHandler(maxRetries int, minRetryDelay, maxRetryDela
}
}

// NextRetryDelay returns the next back off delay for retry.
func (handler *exponentialBackOffHandler) NextRetryDelay() (time.Duration, error) {
// NextDelay returns the next back off delay for retry.
func (handler *exponentialBackoffHandler) NextDelay() (time.Duration, error) {
handler.lock.Lock()
defer handler.lock.Unlock()
handler.retryCount += 1
Expand All @@ -72,10 +74,26 @@ func (handler *exponentialBackOffHandler) NextRetryDelay() (time.Duration, error
return handler.lastRetryDelay, nil
}

// ResetRetryDelay resets the retry delay.
func (handler *exponentialBackOffHandler) ResetRetryDelay() {
// ResetDelay resets the retry delay.
func (handler *exponentialBackoffHandler) ResetDelay() {
handler.lock.Lock()
defer handler.lock.Unlock()
handler.retryCount = 0
handler.lastRetryDelay = time.Duration(0)
}

// DecreaseDelay returns the decreased delay for next retry.
func (handler *exponentialBackoffHandler) DecreaseDelay() time.Duration {
handler.lock.Lock()
defer handler.lock.Unlock()
handler.retryCount = 0
if handler.lastRetryDelay <= handler.minRetryDelay {
handler.lastRetryDelay = time.Duration(0)
} else {
handler.lastRetryDelay = handler.lastRetryDelay*2 - wait.Jitter(handler.lastRetryDelay, 0.5)
if handler.lastRetryDelay < handler.minRetryDelay {
handler.lastRetryDelay = handler.minRetryDelay
}
}
return handler.lastRetryDelay
}
113 changes: 113 additions & 0 deletions pkg/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package backoff

import (
"testing"
"time"
)

const (
testRetry = 15
testMinRetryDelay = 5 * time.Second
testMaxRetryDelay = 5 * time.Minute
)

func verifyError(t *testing.T, err, expectedErr error) {
t.Helper()

if err != expectedErr {
t.Errorf("Expect error to be %v, but got %v", expectedErr, err)
}
}

func verifyExactDelay(t *testing.T, delay, expectedDelay time.Duration) {
t.Helper()

if delay != expectedDelay {
t.Errorf("Expect retry delay = %v, but got %v", expectedDelay, delay)
}
}

func verifyIntervalDelay(t *testing.T, delay, expectedMinDelay, expectedMaxDelay time.Duration) {
t.Helper()

if delay < expectedMinDelay || delay > expectedMaxDelay {
t.Errorf("Expect retry delay between %v and %v, but got %v", expectedMinDelay, expectedMaxDelay, delay)
}
}

// verifyMaxRetries checks that the delay is increased every time until
// testMaxRetryDelay is reached and after reaching the max number of retries
// it checks that BackoffHandler returns ErrRetriesExceeded error when trying to get the next delay
func verifyMaxRetries(t *testing.T, initialDelay time.Duration, handler BackoffHandler) time.Duration {
t.Helper()

expectedDelay := initialDelay
for i := 0; i < testRetry; i++ {
delay, err := handler.NextDelay()
verifyError(t, err, nil)
expectedMaxDelay := expectedDelay * 2
if expectedMaxDelay > testMaxRetryDelay {
expectedMaxDelay = testMaxRetryDelay
}
verifyIntervalDelay(t, delay, expectedDelay, expectedMaxDelay)
expectedDelay = delay
}
_, err := handler.NextDelay()
verifyError(t, err, ErrRetriesExceeded)
return expectedDelay
}

func TestExponentialBackoffHandler(t *testing.T) {
t.Parallel()

handler := NewExponentialBackoffHandler(testRetry, testMinRetryDelay, testMaxRetryDelay)
verifyMaxRetries(t, testMinRetryDelay, handler)
handler.ResetDelay()

delay, err := handler.NextDelay()
verifyError(t, err, nil)
verifyExactDelay(t, delay, testMinRetryDelay)
}

func TestExponentialBackoffHandlerDecreaseDelay(t *testing.T) {
t.Parallel()

handler := NewExponentialBackoffHandler(testRetry, testMinRetryDelay, testMaxRetryDelay)
expectedDelay := verifyMaxRetries(t, testMinRetryDelay, handler)

delay := handler.DecreaseDelay()
verifyIntervalDelay(t, delay, expectedDelay/2, expectedDelay)
verifyMaxRetries(t, delay, handler)

for expectedDelay != testMinRetryDelay {
delay = handler.DecreaseDelay()
expectedMinDelay := expectedDelay / 2
if expectedMinDelay < testMinRetryDelay {
expectedMinDelay = testMinRetryDelay
}
verifyIntervalDelay(t, delay, expectedMinDelay, expectedDelay)
expectedDelay = delay
}

// second iteration is to check that the delay won't go back up to min delay
for i := 0; i < 2; i++ {
delay = handler.DecreaseDelay()
verifyExactDelay(t, delay, time.Duration(0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers
package backoff

import (
"fmt"
Expand All @@ -25,8 +25,8 @@ import (

var ErrHandlerRetrying = fmt.Errorf("retry handler is retrying")

// retryHandler encapsulates logic that handles retry
type retryHandler interface {
// RetryHandler encapsulates logic that handles retry
type RetryHandler interface {
// Retry triggers retry
Retry() error
// Reset resets handler internals
Expand All @@ -43,13 +43,13 @@ type backoffRetryHandler struct {

// backoff delay handling
clock clock.Clock
backoff backoffHandler
backoff BackoffHandler

// retryFunc called on retry
retryFunc func()
}

func NewDelayRetryHandler(retryFunc func(), backoff backoffHandler) *backoffRetryHandler {
func NewDelayRetryHandler(retryFunc func(), backoff BackoffHandler) RetryHandler {
return &backoffRetryHandler{
retrying: false,
clock: clock.RealClock{},
Expand All @@ -68,7 +68,7 @@ func (h *backoffRetryHandler) Retry() error {
return ErrHandlerRetrying
}

delay, err := h.backoff.NextRetryDelay()
delay, err := h.backoff.NextDelay()
if err != nil {
return err
}
Expand All @@ -89,5 +89,5 @@ func (h *backoffRetryHandler) Retry() error {

// Reset resets internal back off delay handler
func (h *backoffRetryHandler) Reset() {
h.backoff.ResetRetryDelay()
h.backoff.ResetDelay()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers
package backoff

import (
"sync"
Expand Down Expand Up @@ -60,9 +60,13 @@ func verifyRetryHandler(t *testing.T, expectCount int, helper *retryHandlerTestH

func TestBackoffRetryHandler_Retry(t *testing.T) {
helper := &retryHandlerTestHelper{}
handler := NewDelayRetryHandler(helper.incrementCount, NewExponentialBackendOffHandler(testMaxRetries, smallTestRetryDelay, testMaxRetryDelay))
fakeClock := clocktesting.NewFakeClock(time.Now())
handler.clock = fakeClock
handler := &backoffRetryHandler{
retrying: false,
clock: fakeClock,
backoff: NewExponentialBackoffHandler(testMaxRetries, smallTestRetryDelay, testMaxRetryDelay),
retryFunc: helper.incrementCount,
}
delay := smallTestRetryDelay

// Trigger 2 Retries and endpointSets one actual retry happens
Expand Down
16 changes: 15 additions & 1 deletion pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,21 @@ Use the flag more than once to rate limit more than one call. If you do not
specify this flag, the default is to rate limit Operations.Get for all versions.
If you do specify this flag one or more times, this default will be overwritten.
If you want to still use the default, simply specify it along with your other
values.`)
values.
Also dynamic throttling strategy is supported in this flag. Example usage:
--gce-ratelimit=ga.NetworkEndpointGroups.ListNetworkEndpoints,strategy,dynamic,10ms,5s,5,2,5,5s,30s
(use dynamic throttling strategy for ga.NetworkEndpointGroups.ListNetworkEndpoints
with the following parameters:
minimum delay = 10ms
maximum delay = 5s
number of quota errors before increasing the delay = 5
number of requests without quota errors before decreasing the delay = 2
number of requests without quota errors before resetting the delay = 5
the amount of time without any requests before decreasing the delay = 5s
the amount of time without any requests before resetting the delay = 30s
Dynamic throttling can be combined with QPS rate limiter for one API, in that
case dynamic throttling is used first, and then the QPS rate limiter introduces
additional delay if needed.`)
flag.Float64Var(&F.GCERateLimitScale, "gce-ratelimit-scale", 1.0,
`Optional, scales rate limit options by a constant factor.
1.0 is no multiplier. 5.0 means increase all rate and capacity by 5x.`)
Expand Down
20 changes: 12 additions & 8 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,18 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
// TODO(freehan): filter the NEs that are in interest once the API supports it
res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion())
if err != nil {
// On receiving GCE API error, do not retry immediately. This is to prevent the reflector to overwhelm the GCE NEG API when
// rate limiting is in effect. This will prevent readiness reflector to overwhelm the GCE NEG API and cause NEG syncers to backoff.
// This will effectively batch NEG health status updates for 100s. The pods added into NEG in this 100s will not be marked ready
// until the next status poll is executed. However, the pods are not marked as Ready and still passes the LB health check will
// serve LB traffic. The side effect during the delay period is the workload (depending on rollout strategy) might slow down rollout.
// TODO(freehan): enable exponential backoff.
p.logger.Error(err, "Failed to ListNetworkEndpoint in NEG. Retrying after some time.", "neg", key.String(), "retryDelay", retryDelay.String())
<-p.clock.After(retryDelay)
if negtypes.IsStrategyQuotaError(err) {
p.logger.V(4).Error(err, "Failed to ListNetworkEndpoints in NEG", "neg", key.String())
} else {
// On receiving GCE API error, do not retry immediately. This is to prevent the reflector to overwhelm the GCE NEG API when
// rate limiting is in effect. This will prevent readiness reflector to overwhelm the GCE NEG API and cause NEG syncers to backoff.
// This will effectively batch NEG health status updates for 100s. The pods added into NEG in this 100s will not be marked ready
// until the next status poll is executed. However, the pods are not marked as Ready and still passes the LB health check will
// serve LB traffic. The side effect during the delay period is the workload (depending on rollout strategy) might slow down rollout.
// TODO(freehan): enable exponential backoff.
p.logger.Error(err, "Failed to ListNetworkEndpoints in NEG. Retrying after some time.", "neg", key.String(), "retryDelay", retryDelay.String())
<-p.clock.After(retryDelay)
}
return true, err
}

Expand Down
Loading

0 comments on commit 80f2d99

Please sign in to comment.