Skip to content

Commit

Permalink
feat: use rate limited queue (argoproj#15480)
Browse files Browse the repository at this point in the history
* feat: use rate limited queue

Signed-off-by: Soumya Ghosh Dastidar <gdsoumya@gmail.com>
Signed-off-by: jmilic1 <70441727+jmilic1@users.noreply.github.com>
  • Loading branch information
gdsoumya authored and jmilic1 committed Nov 13, 2023
1 parent e02f7f4 commit ba20eff
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"time"

"github.com/argoproj/argo-cd/v2/pkg/ratelimiter"
"github.com/argoproj/pkg/stats"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -45,6 +46,7 @@ const (

func NewCommand() *cobra.Command {
var (
workqueueRateLimit ratelimiter.AppControllerRateLimiterConfig
clientConfig clientcmd.ClientConfig
appResyncPeriod int64
appHardResyncPeriod int64
Expand Down Expand Up @@ -160,6 +162,7 @@ func NewCommand() *cobra.Command {
persistResourceHealth,
clusterFilter,
applicationNamespaces,
&workqueueRateLimit,
)
errors.CheckError(err)
cacheutil.CollectMetrics(redisClient, appController.GetMetricsServer())
Expand Down Expand Up @@ -205,6 +208,15 @@ func NewCommand() *cobra.Command {
command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from")
command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD")
command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ")
// global queue rate limit config
command.Flags().Int64Var(&workqueueRateLimit.BucketSize, "wq-bucket-size", env.ParseInt64FromEnv("WORKQUEUE_BUCKET_SIZE", 500, 1, math.MaxInt64), "Set Workqueue Rate Limiter Bucket Size, default 500")
command.Flags().Int64Var(&workqueueRateLimit.BucketQPS, "wq-bucket-qps", env.ParseInt64FromEnv("WORKQUEUE_BUCKET_QPS", 50, 1, math.MaxInt64), "Set Workqueue Rate Limiter Bucket QPS, default 50")
// individual item rate limit config
// when WORKQUEUE_FAILURE_COOLDOWN is 0 per item rate limiting is disabled(default)
command.Flags().DurationVar(&workqueueRateLimit.FailureCoolDown, "wq-cooldown-ns", time.Duration(env.ParseInt64FromEnv("WORKQUEUE_FAILURE_COOLDOWN_NS", 0, 0, (24*time.Hour).Nanoseconds())), "Set Workqueue Per Item Rate Limiter Cooldown duration in ns, default 0(per item rate limiter disabled)")
command.Flags().DurationVar(&workqueueRateLimit.BaseDelay, "wq-basedelay-ns", time.Duration(env.ParseInt64FromEnv("WORKQUEUE_BASE_DELAY_NS", time.Millisecond.Nanoseconds(), time.Nanosecond.Nanoseconds(), (24*time.Hour).Nanoseconds())), "Set Workqueue Per Item Rate Limiter Base Delay duration in nanoseconds, default 1000000 (1ms)")
command.Flags().DurationVar(&workqueueRateLimit.MaxDelay, "wq-maxdelay-ns", time.Duration(env.ParseInt64FromEnv("WORKQUEUE_MAX_DELAY_NS", time.Second.Nanoseconds(), 1*time.Millisecond.Nanoseconds(), (24*time.Hour).Nanoseconds())), "Set Workqueue Per Item Rate Limiter Max Delay duration in nanoseconds, default 1000000000 (1s)")
command.Flags().Float64Var(&workqueueRateLimit.BackoffFactor, "wq-backoff-factor", env.ParseFloat64FromEnv("WORKQUEUE_BACKOFF_FACTOR", 1.5, 0, math.MaxFloat64), "Set Workqueue Per Item Rate Limiter Backoff Factor, default is 1.5")
command.Flags().BoolVar(&enableDynamicClusterDistribution, "dynamic-cluster-distribution-enabled", env.ParseBoolFromEnv(common.EnvEnableDynamicClusterDistribution, false), "Enables dynamic cluster distribution.")
cacheSource = appstatecache.AddCacheFlagsToCmd(&command, func(client *redis.Client) {
redisClient = client
Expand Down
30 changes: 19 additions & 11 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync"
"time"

"github.com/argoproj/argo-cd/v2/pkg/ratelimiter"
clustercache "github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/diff"
"github.com/argoproj/gitops-engine/pkg/health"
Expand Down Expand Up @@ -148,20 +149,25 @@ func NewApplicationController(
persistResourceHealth bool,
clusterFilter func(cluster *appv1.Cluster) bool,
applicationNamespaces []string,
rateLimiterConfig *ratelimiter.AppControllerRateLimiterConfig,
) (*ApplicationController, error) {
log.Infof("appResyncPeriod=%v, appHardResyncPeriod=%v", appResyncPeriod, appHardResyncPeriod)
db := db.NewDB(namespace, settingsMgr, kubeClientset)
if rateLimiterConfig == nil {
rateLimiterConfig = ratelimiter.GetDefaultAppRateLimiterConfig()
log.Info("Using default workqueue rate limiter config")
}
ctrl := ApplicationController{
cache: argoCache,
namespace: namespace,
kubeClientset: kubeClientset,
kubectl: kubectl,
applicationClientset: applicationClientset,
repoClientset: repoClientset,
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "app_reconciliation_queue"),
appOperationQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "app_operation_processing_queue"),
projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "project_reconciliation_queue"),
appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
appRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_reconciliation_queue"),
appOperationQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "app_operation_processing_queue"),
projectRefreshQueue: workqueue.NewNamedRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig), "project_reconciliation_queue"),
appComparisonTypeRefreshQueue: workqueue.NewRateLimitingQueue(ratelimiter.NewCustomAppControllerRateLimiter(rateLimiterConfig)),
db: db,
statusRefreshTimeout: appResyncPeriod,
statusHardRefreshTimeout: appHardResyncPeriod,
Expand All @@ -185,7 +191,7 @@ func NewApplicationController(
_, err = projInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
ctrl.projectRefreshQueue.Add(key)
ctrl.projectRefreshQueue.AddRateLimited(key)
if projMeta, ok := obj.(metav1.Object); ok {
ctrl.InvalidateProjectsCache(projMeta.GetName())
}
Expand All @@ -194,14 +200,15 @@ func NewApplicationController(
},
UpdateFunc: func(old, new interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
ctrl.projectRefreshQueue.Add(key)
ctrl.projectRefreshQueue.AddRateLimited(key)
if projMeta, ok := new.(metav1.Object); ok {
ctrl.InvalidateProjectsCache(projMeta.GetName())
}
}
},
DeleteFunc: func(obj interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
// immediately push to queue for deletes
ctrl.projectRefreshQueue.Add(key)
if projMeta, ok := obj.(metav1.Object); ok {
ctrl.InvalidateProjectsCache(projMeta.GetName())
Expand Down Expand Up @@ -815,8 +822,8 @@ func (ctrl *ApplicationController) requestAppRefresh(appName string, compareWith
ctrl.appRefreshQueue.AddAfter(key, *after)
ctrl.appOperationQueue.AddAfter(key, *after)
} else {
ctrl.appRefreshQueue.Add(key)
ctrl.appOperationQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.appOperationQueue.AddRateLimited(key)
}
}
}
Expand Down Expand Up @@ -1991,8 +1998,8 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
ctrl.appRefreshQueue.Add(key)
ctrl.appOperationQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.appOperationQueue.AddRateLimited(key)
}
},
UpdateFunc: func(old, new interface{}) {
Expand All @@ -2012,7 +2019,7 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
compareWith = CompareWithLatest.Pointer()
}
ctrl.requestAppRefresh(newApp.QualifiedName(), compareWith, nil)
ctrl.appOperationQueue.Add(key)
ctrl.appOperationQueue.AddRateLimited(key)
},
DeleteFunc: func(obj interface{}) {
if !ctrl.canProcessApp(obj) {
Expand All @@ -2022,6 +2029,7 @@ func (ctrl *ApplicationController) newApplicationInformerAndLister() (cache.Shar
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
// for deletes, we immediately add to the refresh queue
ctrl.appRefreshQueue.Add(key)
}
},
Expand Down
11 changes: 6 additions & 5 deletions controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func newFakeController(data *fakeData) *ApplicationController {
true,
nil,
data.applicationNamespaces,
nil,
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -793,7 +794,7 @@ func TestNormalizeApplication(t *testing.T) {
// Verify we normalize the app because project is missing
ctrl := newFakeController(&data)
key, _ := cache.MetaNamespaceKeyFunc(app)
ctrl.appRefreshQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
fakeAppCs.ReactionChain = nil
normalized := false
Expand All @@ -815,7 +816,7 @@ func TestNormalizeApplication(t *testing.T) {
data.apps[0] = app
ctrl := newFakeController(&data)
key, _ := cache.MetaNamespaceKeyFunc(app)
ctrl.appRefreshQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)
fakeAppCs := ctrl.applicationClientset.(*appclientset.Clientset)
fakeAppCs.ReactionChain = nil
normalized := false
Expand Down Expand Up @@ -1278,7 +1279,7 @@ func TestUpdateReconciledAt(t *testing.T) {
t.Run("UpdatedOnFullReconciliation", func(t *testing.T) {
receivedPatch = map[string]interface{}{}
ctrl.requestAppRefresh(app.Name, CompareWithLatest.Pointer(), nil)
ctrl.appRefreshQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)

ctrl.processAppRefreshQueueItem()

Expand All @@ -1293,7 +1294,7 @@ func TestUpdateReconciledAt(t *testing.T) {

t.Run("NotUpdatedOnPartialReconciliation", func(t *testing.T) {
receivedPatch = map[string]interface{}{}
ctrl.appRefreshQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.requestAppRefresh(app.Name, CompareWithRecent.Pointer(), nil)

ctrl.processAppRefreshQueueItem()
Expand Down Expand Up @@ -1323,7 +1324,7 @@ func TestProjectErrorToCondition(t *testing.T) {
managedLiveObjs: make(map[kube.ResourceKey]*unstructured.Unstructured),
})
key, _ := cache.MetaNamespaceKeyFunc(app)
ctrl.appRefreshQueue.Add(key)
ctrl.appRefreshQueue.AddRateLimited(key)
ctrl.requestAppRefresh(app.Name, CompareWithRecent.Pointer(), nil)

ctrl.processAppRefreshQueueItem()
Expand Down
53 changes: 53 additions & 0 deletions docs/operator-manual/high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,56 @@ spec:
path: my-application
# ...
```

## Rate Limiting Application Reconciliations

To prevent high controller resource usage or sync loops caused either due to misbehaving apps or other environment specific factors,
we can configure rate limits on the workqueues used by the application controller. There are two types of rate limits that can be configured:

* Global rate limits
* Per item rate limits

The final rate limiter uses a combination of both and calculates the final backoff as `max(globalBackoff, perItemBackoff)`.

### Global rate limits

This is enabled by default, it is a simple bucket based rate limiter that limits the number of items that can be queued per second.
This is useful to prevent a large number of apps from being queued at the same time.

To configure the bucket limiter you can set the following environment variables:

* `WORKQUEUE_BUCKET_SIZE` - The number of items that can be queued in a single burst. Defaults to 500.
* `WORKQUEUE_BUCKET_QPS` - The number of items that can be queued per second. Defaults to 50.

### Per item rate limits

This by default returns a fixed base delay/backoff value but can be configured to return exponential values, read further to understand it's working.
Per item rate limiter limits the number of times a particular item can be queued. This is based on exponential backoff where the backoff time for an item keeps increasing exponentially
if it is queued multiple times in a short period, but the backoff is reset automatically if a configured `cool down` period has elapsed since the last time the item was queued.

To configure the per item limiter you can set the following environment variables:

* `WORKQUEUE_FAILURE_COOLDOWN_NS` : The cool down period in nanoseconds, once period has elapsed for an item the backoff is reset. Exponential backoff is disabled if set to 0(default), eg. values : 10 * 10^9 (=10s)
* `WORKQUEUE_BASE_DELAY_NS` : The base delay in nanoseconds, this is the initial backoff used in the exponential backoff formula. Defaults to 1000 (=1μs)
* `WORKQUEUE_MAX_DELAY_NS` : The max delay in nanoseconds, this is the max backoff limit. Defaults to 3 * 10^9 (=3s)
* `WORKQUEUE_BACKOFF_FACTOR` : The backoff factor, this is the factor by which the backoff is increased for each retry. Defaults to 1.5

The formula used to calculate the backoff time for an item, where `numRequeue` is the number of times the item has been queued
and `lastRequeueTime` is the time at which the item was last queued:

- When `WORKQUEUE_FAILURE_COOLDOWN_NS` != 0 :

```
backoff = time.Since(lastRequeueTime) >= WORKQUEUE_FAILURE_COOLDOWN_NS ?
WORKQUEUE_BASE_DELAY_NS :
min(
WORKQUEUE_MAX_DELAY_NS,
WORKQUEUE_BASE_DELAY_NS * WORKQUEUE_BACKOFF_FACTOR ^ (numRequeue)
)
```

- When `WORKQUEUE_FAILURE_COOLDOWN_NS` = 0 :

```
backoff = WORKQUEUE_BASE_DELAY_NS
```
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,11 @@ argocd-application-controller [flags]
--token string Bearer token for authentication to the API server
--user string The name of the kubeconfig user to use
--username string Username for basic authentication to the API server
--wq-backoff-factor float Set Workqueue Per Item Rate Limiter Backoff Factor, default is 1.5 (default 1.5)
--wq-basedelay-ns duration Set Workqueue Per Item Rate Limiter Base Delay duration in nanoseconds, default 1000000 (1ms) (default 1ms)
--wq-bucket-qps int Set Workqueue Rate Limiter Bucket QPS, default 50 (default 50)
--wq-bucket-size int Set Workqueue Rate Limiter Bucket Size, default 500 (default 500)
--wq-cooldown-ns duration Set Workqueue Per Item Rate Limiter Cooldown duration in ns, default 0(per item rate limiter disabled)
--wq-maxdelay-ns duration Set Workqueue Per Item Rate Limiter Max Delay duration in nanoseconds, default 1000000000 (1s) (default 1s)
```

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ require (
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.3.0
golang.org/x/tools v0.7.0 // indirect
gomodules.xyz/envconfig v1.3.1-0.20190308184047-426f31af0d45 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
Expand Down
123 changes: 123 additions & 0 deletions pkg/ratelimiter/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package ratelimiter

import (
"math"
"sync"
"time"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

type AppControllerRateLimiterConfig struct {
BucketSize int64
BucketQPS int64
FailureCoolDown time.Duration
BaseDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
}

func GetDefaultAppRateLimiterConfig() *AppControllerRateLimiterConfig {
return &AppControllerRateLimiterConfig{
// global queue rate limit config
500,
50,
// individual item rate limit config
// when WORKQUEUE_FAILURE_COOLDOWN is 0 per item rate limiting is disabled(default)
0,
time.Millisecond,
time.Second,
1.5,
}
}

// NewCustomAppControllerRateLimiter is a constructor for the rate limiter for a workqueue used by app controller. It has
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential(with auto resets)
func NewCustomAppControllerRateLimiter(cfg *AppControllerRateLimiterConfig) workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
NewItemExponentialRateLimiterWithAutoReset(cfg.BaseDelay, cfg.MaxDelay, cfg.FailureCoolDown, cfg.BackoffFactor),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.BucketQPS), int(cfg.BucketSize))},
)
}

type failureData struct {
failures int
lastFailure time.Time
}

// ItemExponentialRateLimiterWithAutoReset does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration/resets are up dependent on the cooldown period
type ItemExponentialRateLimiterWithAutoReset struct {
failuresLock sync.Mutex
failures map[interface{}]failureData

baseDelay time.Duration
maxDelay time.Duration
coolDown time.Duration
backoffFactor float64
}

var _ workqueue.RateLimiter = &ItemExponentialRateLimiterWithAutoReset{}

func NewItemExponentialRateLimiterWithAutoReset(baseDelay, maxDelay, failureCoolDown time.Duration, backoffFactor float64) workqueue.RateLimiter {
return &ItemExponentialRateLimiterWithAutoReset{
failures: map[interface{}]failureData{},
baseDelay: baseDelay,
maxDelay: maxDelay,
coolDown: failureCoolDown,
backoffFactor: backoffFactor,
}
}

func (r *ItemExponentialRateLimiterWithAutoReset) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

if _, ok := r.failures[item]; !ok {
r.failures[item] = failureData{
failures: 0,
lastFailure: time.Now(),
}
}

exp := r.failures[item]

// if coolDown period is reached reset failures for item
if time.Since(exp.lastFailure) >= r.coolDown {
delete(r.failures, item)
return r.baseDelay
}

r.failures[item] = failureData{
failures: exp.failures + 1,
lastFailure: time.Now(),
}

// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(r.backoffFactor, float64(exp.failures))
if backoff > math.MaxInt64 {
return r.maxDelay
}

calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}

return calculated
}

func (r *ItemExponentialRateLimiterWithAutoReset) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

return r.failures[item].failures
}

func (r *ItemExponentialRateLimiterWithAutoReset) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()

delete(r.failures, item)
}
Loading

0 comments on commit ba20eff

Please sign in to comment.