Skip to content

Commit

Permalink
add maxConcurrentCanaries flag to limit concurrent progressing canaries
Browse files Browse the repository at this point in the history
This adds a flag to limit concurrent progessing canaries to avoid high
requests of resources at once.

The flag will not take effect if set to "0", which is default.

Closes fluxcd#1069

Signed-off-by: Louis Halbritter <halbritter@posteo.de>
  • Loading branch information
louishalbritter committed Feb 8, 2024
1 parent 3f06a0b commit 0fbdead
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 45 deletions.
3 changes: 3 additions & 0 deletions charts/flagger/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ spec:
{{- if .Values.noCrossNamespaceRefs }}
- -no-cross-namespace-refs={{ .Values.noCrossNamespaceRefs }}
{{- end }}
{{- if .Values.maxConcurrentCanaries }}
- -max-concurrent-canaries={{ .Values.maxConcurrentCanaries }}
{{- end }}
livenessProbe:
exec:
command:
Expand Down
3 changes: 3 additions & 0 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
kubeconfigServiceMesh string
clusterName string
noCrossNamespaceRefs bool
maxConcurrentCanaries int
)

func init() {
Expand Down Expand Up @@ -121,6 +122,7 @@ func init() {
flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.")
flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.")
flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.")
flag.IntVar(&maxConcurrentCanaries, "max-concurrent-canaries", 0, "Limit parallel processing canaries. Unlimited if set to 0, which is default")
}

func main() {
Expand Down Expand Up @@ -253,6 +255,7 @@ func main() {
fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
clusterName,
noCrossNamespaceRefs,
maxConcurrentCanaries,
)

// leader election context
Expand Down
83 changes: 43 additions & 40 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,27 @@ const controllerAgentName = "flagger"

// Controller is managing the canary objects and schedules canary deployments
type Controller struct {
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
jobs map[string]CanaryJob
recorder metrics.Recorder
notifier notifier.Interface
canaryFactory *canary.Factory
routerFactory *router.Factory
observerFactory *observers.Factory
meshProvider string
eventWebhook string
clusterName string
noCrossNamespaceRefs bool
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
flaggerInformers Informers
flaggerSynced cache.InformerSynced
flaggerWindow time.Duration
workqueue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
logger *zap.SugaredLogger
canaries *sync.Map
jobs map[string]CanaryJob
recorder metrics.Recorder
notifier notifier.Interface
canaryFactory *canary.Factory
routerFactory *router.Factory
observerFactory *observers.Factory
meshProvider string
eventWebhook string
clusterName string
noCrossNamespaceRefs bool
pendingCanaries map[string]bool
maxConcurrentCanaries int
}

type Informers struct {
Expand All @@ -91,6 +93,7 @@ func NewController(
eventWebhook string,
clusterName string,
noCrossNamespaceRefs bool,
maxConcurrentCanaries int,
) *Controller {
logger.Debug("Creating event broadcaster")
flaggerscheme.AddToScheme(scheme.Scheme)
Expand All @@ -105,25 +108,27 @@ func NewController(
recorder.SetInfo(version, meshProvider)

ctrl := &Controller{
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
jobs: map[string]CanaryJob{},
flaggerWindow: flaggerWindow,
observerFactory: observerFactory,
recorder: recorder,
notifier: notifier,
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
clusterName: clusterName,
noCrossNamespaceRefs: noCrossNamespaceRefs,
kubeClient: kubeClient,
flaggerClient: flaggerClient,
flaggerInformers: flaggerInformers,
flaggerSynced: flaggerInformers.CanaryInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
eventRecorder: eventRecorder,
logger: logger,
canaries: new(sync.Map),
jobs: map[string]CanaryJob{},
flaggerWindow: flaggerWindow,
observerFactory: observerFactory,
recorder: recorder,
notifier: notifier,
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
clusterName: clusterName,
noCrossNamespaceRefs: noCrossNamespaceRefs,
pendingCanaries: map[string]bool{},
maxConcurrentCanaries: maxConcurrentCanaries,
}

flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -232,7 +237,6 @@ func (c *Controller) processNextWorkItem() bool {
c.workqueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
Expand Down Expand Up @@ -302,7 +306,6 @@ func (c *Controller) syncHandler(key string) error {
if err := c.addFinalizer(cd); err != nil {
return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
}

}
c.logger.Infof("Synced %s", key)

Expand Down
17 changes: 12 additions & 5 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (c *Controller) scheduleCanaries() {
for job := range c.jobs {
if _, exists := current[job]; !exists {
c.jobs[job].Stop()
delete(c.pendingCanaries, job)
delete(c.jobs, job)
}
}
Expand Down Expand Up @@ -280,11 +281,22 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

key := fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)

if !shouldAdvance {
delete(c.pendingCanaries, key)
c.recorder.SetStatus(cd, cd.Status.Phase)
return
}

if _, exists := c.pendingCanaries[key]; c.maxConcurrentCanaries > 0 && len(c.pendingCanaries) >= c.maxConcurrentCanaries && !exists {
canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseWaiting)
c.recordEventInfof(cd, "waiting with canary %v.%v %v to process, because maximum of concurrent canaries reached", cd.Name, cd.Namespace, cd.UID)
return
}

c.pendingCanaries[key] = true

maxWeight := c.maxWeight(cd)

// check primary status
Expand Down Expand Up @@ -476,7 +488,6 @@ func (c *Controller) advanceCanary(name string, namespace string) {
}
c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)
}

}

func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryController canary.Controller,
Expand Down Expand Up @@ -533,7 +544,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo
}

return

}

func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,
Expand Down Expand Up @@ -720,7 +730,6 @@ func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController can
return
}
}

}

func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool {
Expand Down Expand Up @@ -844,7 +853,6 @@ func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController ca
}

return newCfg, nil

}

func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool {
Expand Down Expand Up @@ -1001,7 +1009,6 @@ func (c *Controller) setPhaseInitializing(cd *flaggerv1.Canary) error {
firstTry = false
return
})

if err != nil {
return fmt.Errorf("failed after retries: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scheduler_daemonset_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
recorder: metrics.NewRecorder(controllerAgentName, false),
routerFactory: rf,
notifier: &notifier.NopNotifier{},
pendingCanaries: map[string]bool{},
}
ctrl.flaggerSynced = alwaysReady
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/scheduler_deployment_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
recorder: metrics.NewRecorder(controllerAgentName, false),
routerFactory: rf,
notifier: &notifier.NopNotifier{},
pendingCanaries: map[string]bool{},
}
ctrl.flaggerSynced = alwaysReady
ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(c)
Expand Down
77 changes: 77 additions & 0 deletions pkg/controller/scheduler_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,83 @@ func TestScheduler_DeploymentPromotion(t *testing.T) {
assert.Equal(t, flaggerv1.CanaryPhaseSucceeded, c.Status.Phase)
}

func TestScheduler_DeploymentMaxConcurrent(t *testing.T) {
mocks := newDeploymentFixture(nil)

secondCanary := newDeploymentTestCanary()
secondCanary.Name = "podinfo2"

mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Create(context.TODO(), secondCanary, metav1.CreateOptions{})
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary)

// initializing
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")

// make primary ready
mocks.makePrimaryReady(t)

// initialized
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")

// update
dep2 := newDeploymentTestDeploymentV2()
_, err := mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{})
require.NoError(t, err)

// detect pod spec changes
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")

// if no maxConcurrentCanaries is set, all canaries should proceed
c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)

c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)

// delete second canary and set maxConcurrency. Then add it again
delete(mocks.ctrl.pendingCanaries, "podinfo2.default")
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Delete(secondCanary)
mocks.ctrl.maxConcurrentCanaries = 1
mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Add(secondCanary)

mocks.ctrl.advanceCanary("podinfo2", "default")
mocks.ctrl.advanceCanary("podinfo2", "default")
_, err = mocks.kubeClient.AppsV1().Deployments("default").Update(context.TODO(), dep2, metav1.UpdateOptions{})
require.NoError(t, err)

// check if second canary is waiting now
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
mocks.ctrl.advanceCanary("podinfo2", "default")
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseWaiting, c.Status.Phase)

// make first deployment succeeded
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")
mocks.ctrl.advanceCanary("podinfo", "default")

// after succeeded it should get removed from pendingCanaries
mocks.ctrl.advanceCanary("podinfo", "default")

// second canary should start with next call
mocks.ctrl.advanceCanary("podinfo2", "default")

// check if second canary is starting
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo2", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, flaggerv1.CanaryPhaseProgressing, c.Status.Phase)
}

func TestScheduler_DeploymentMirroring(t *testing.T) {
mocks := newDeploymentFixture(newDeploymentTestCanaryMirror())

Expand Down

0 comments on commit 0fbdead

Please sign in to comment.