Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick PR #416 to release-v1.2 #423

Merged
merged 1 commit into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion controllers/options.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package controllers

import "errors"
import (
"errors"

"k8s.io/client-go/util/workqueue"
)

// ControllerConfig is the configuration for cluster and machine controllers
type ControllerConfig struct {
MaxConcurrentReconciles int
RateLimiter workqueue.RateLimiter
}

// ControllerConfigOpts is a function that can be used to configure the controller config
Expand All @@ -20,3 +25,14 @@ func WithMaxConcurrentReconciles(max int) ControllerConfigOpts {
return nil
}
}

// WithRateLimiter sets the rate limiter for the controller
func WithRateLimiter(rateLimiter workqueue.RateLimiter) ControllerConfigOpts {
return func(c *ControllerConfig) error {
if rateLimiter == nil {
return errors.New("rate limiter cannot be nil")
}
c.RateLimiter = rateLimiter
return nil
}
}
36 changes: 36 additions & 0 deletions controllers/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/util/workqueue"
)

func TestWithMaxConcurrentReconciles(t *testing.T) {
Expand Down Expand Up @@ -37,3 +38,38 @@ func TestWithMaxConcurrentReconciles(t *testing.T) {
})
}
}

func TestWithRateLimiter(t *testing.T) {
tests := []struct {
name string
rateLimiter workqueue.RateLimiter
expectError bool
expectedType interface{}
}{
{
name: "TestWithRateLimiterNil",
rateLimiter: nil,
expectError: true,
expectedType: nil,
},
{
name: "TestWithRateLimiterSet",
rateLimiter: workqueue.DefaultControllerRateLimiter(),
expectError: false,
expectedType: &workqueue.MaxOfRateLimiter{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opt := WithRateLimiter(tt.rateLimiter)
config := &ControllerConfig{}
err := opt(config)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.IsType(t, tt.expectedType, config.RateLimiter)
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.21.0
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9
k8s.io/api v0.25.2
k8s.io/apimachinery v0.25.2
k8s.io/client-go v0.25.2
Expand Down Expand Up @@ -116,7 +117,6 @@ require (
golang.org/x/sys v0.14.0 // indirect
golang.org/x/term v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90 // indirect
Expand Down
78 changes: 67 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,31 @@ limitations under the License.
package main

import (
"errors"
"flag"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
capiv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
//+kubebuilder:scaffold:imports

infrav1alpha4 "github.com/nutanix-cloud-native/cluster-api-provider-nutanix/api/v1alpha4"
infrav1beta1 "github.com/nutanix-cloud-native/cluster-api-provider-nutanix/api/v1beta1"
"github.com/nutanix-cloud-native/cluster-api-provider-nutanix/controllers"
//+kubebuilder:scaffold:imports
)

var (
Expand Down Expand Up @@ -82,6 +76,10 @@ func main() {
enableLeaderElection bool
probeAddr string
maxConcurrentReconciles int
baseDelay time.Duration
maxDelay time.Duration
bucketSize int
qps int
)

flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -99,6 +97,11 @@ func main() {
TimeEncoder: zapcore.RFC3339TimeEncoder,
}
opts.BindFlags(flag.CommandLine)

flag.DurationVar(&baseDelay, "rate-limiter-base-delay", 500*time.Millisecond, "The base delay for the rate limiter.")
flag.DurationVar(&maxDelay, "rate-limiter-max-delay", 15*time.Minute, "The maximum delay for the rate limiter.")
thunderboltsid marked this conversation as resolved.
Show resolved Hide resolved
flag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for the rate limiter.")
flag.IntVar(&qps, "rate-limiter-qps", 10, "The QPS for the rate limiter.")
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
Expand All @@ -117,7 +120,13 @@ func main() {
os.Exit(1)
}

// Set up the context that's going to be used in controllers and for the manager.
rateLimiter, err := compositeRateLimiter(baseDelay, maxDelay, bucketSize, qps)
if err != nil {
setupLog.Error(err, "unable to create composite rate limiter")
os.Exit(1)
}

// Setup the context that's going to be used in controllers and for the manager.
ctx := ctrl.SetupSignalHandler()

// Create a secret informer for the Nutanix client
Expand All @@ -143,6 +152,7 @@ func main() {
configMapInformer,
mgr.GetScheme(),
controllers.WithMaxConcurrentReconciles(maxConcurrentReconciles),
controllers.WithRateLimiter(rateLimiter),
)
if err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NutanixCluster")
Expand All @@ -159,6 +169,7 @@ func main() {
configMapInformer,
mgr.GetScheme(),
controllers.WithMaxConcurrentReconciles(maxConcurrentReconciles),
controllers.WithRateLimiter(rateLimiter),
)
if err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NutanixMachine")
Expand All @@ -185,3 +196,48 @@ func main() {
os.Exit(1)
}
}

// compositeRateLimiter will build a limiter similar to the default from DefaultControllerRateLimiter but with custom values.
func compositeRateLimiter(baseDelay, maxDelay time.Duration, bucketSize, qps int) (workqueue.RateLimiter, error) {
// Validate the rate limiter configuration
if err := validateRateLimiterConfig(baseDelay, maxDelay, bucketSize, qps); err != nil {
return nil, err
}
exponentialBackoffLimiter := workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay)
bucketLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)}
return workqueue.NewMaxOfRateLimiter(exponentialBackoffLimiter, bucketLimiter), nil
}

// validateRateLimiterConfig validates the rate limiter configuration parameters
func validateRateLimiterConfig(baseDelay, maxDelay time.Duration, bucketSize, qps int) error {
// Check if baseDelay is a non-negative value
if baseDelay < 0 {
return errors.New("baseDelay cannot be negative")
}

// Check if maxDelay is non-negative and greater than or equal to baseDelay
if maxDelay < 0 {
return errors.New("maxDelay cannot be negative")
}

if maxDelay < baseDelay {
return errors.New("maxDelay should be greater than or equal to baseDelay")
}

// Check if bucketSize is a positive number
if bucketSize <= 0 {
return errors.New("bucketSize must be positive")
}

// Check if qps is a positive number
if qps <= 0 {
return errors.New("minimum QPS must be positive")
}

// Check if bucketSize is at least as large as the QPS
if bucketSize < qps {
return errors.New("bucketSize must be at least as large as the QPS to handle bursts effectively")
}

return nil
}
Loading