From c4e2d4fb80721272637b466a34d891492d011c1b Mon Sep 17 00:00:00 2001 From: Juranir Santos Date: Tue, 2 Apr 2024 09:27:08 +0100 Subject: [PATCH 01/33] fix: Refactor the func newConfigMapInformer to watch both main and mgmt ns. Fixes #11657 Signed-off-by: Juranir Santos --- workflow/controller/controller.go | 161 ++++++++++++++++++------- workflow/controller/controller_test.go | 3 +- 2 files changed, 121 insertions(+), 43 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1c8764a081a8..77e75427d9cb 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -12,6 +12,8 @@ import ( "github.com/upper/db/v4" + "github.com/argoproj/argo-workflows/v3" + "github.com/argoproj/pkg/errors" syncpkg "github.com/argoproj/pkg/sync" log "github.com/sirupsen/logrus" @@ -39,7 +41,6 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" - "github.com/argoproj/argo-workflows/v3" "github.com/argoproj/argo-workflows/v3/config" argoErr "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/persist/sqldb" @@ -124,6 +125,7 @@ type WorkflowController struct { cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer configMapInformer cache.SharedIndexInformer + configMapInformerMgmt cache.SharedIndexInformer wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy throttler sync.Throttler @@ -319,11 +321,18 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.configMapInformer, err = wfc.newConfigMapInformer() + wfc.configMapInformer, err = wfc.newConfigMapInformer(wfc.GetNamespace()) if err != nil { log.Fatal(err) } + if wfc.isManagedNamespaceDifferent() { + wfc.configMapInformerMgmt, err = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) + if err != nil { + log.Fatal(err) + } + } + // Create Synchronization Manager wfc.createSynchronizationManager(ctx) // init managers: throttler and SynchronizationManager @@ -339,6 +348,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.configMapInformer.Run(ctx.Done()) + + if wfc.isManagedNamespaceDifferent() { + go wfc.configMapInformerMgmt.Run(ctx.Done()) + } + go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.artGCTaskInformer.Informer().Run(ctx.Done()) go wfc.taskResultInformer.Run(ctx.Done()) @@ -351,6 +365,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced, wfc.configMapInformer.HasSynced, + wfc.isConfigMapInformerMgmtSynced, wfc.wfTaskSetInformer.Informer().HasSynced, wfc.artGCTaskInformer.Informer().HasSynced, wfc.taskResultInformer.HasSynced, @@ -1270,43 +1285,62 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } -func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { - indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ +func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedIndexInformer, error) { + indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, ns, 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }, func(opts *metav1.ListOptions) { opts.LabelSelector = common.LabelKeyConfigMapType }) + log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") - if wfc.executorPlugins != nil { - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - cm, err := meta.Accessor(obj) - if err != nil { - return false + + ctx := context.Background() + + _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cm, err := meta.Accessor(obj) + if err != nil { + log.WithError(err). + Error("failed to get configmap") + + return false + } + //return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin + if wfc.isPluginCM(cm) { + log.WithField("executorPlugins", wfc.executorPlugins != nil). + Info("Plugins") + } + + return wfc.isPluginCM(cm) || wfc.isControllerCM(cm) || wfc.isFromManagedNamespace(cm) + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + + if !wfc.isPluginCM(cm) { + return } - return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - p, err := plugin.FromConfigMap(cm) - if err != nil { - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - WithError(err). - Error("failed to convert configmap to plugin") - return - } - if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { - wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} - } - wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p + + p, err := plugin.FromConfigMap(cm) + if err != nil { log.WithField("namespace", cm.GetNamespace()). WithField("name", cm.GetName()). - Info("Executor plugin added") - }, - UpdateFunc: func(_, obj interface{}) { - cm := obj.(*apiv1.ConfigMap) + WithError(err). + Error("failed to convert configmap to plugin") + return + } + if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { + wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} + } + wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + Info("Executor plugin added") + }, + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + + if wfc.isPluginCM(cm) { p, err := plugin.FromConfigMap(cm) if err != nil { log.WithField("namespace", cm.GetNamespace()). @@ -1320,21 +1354,36 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer log.WithField("namespace", cm.GetNamespace()). WithField("name", cm.GetName()). Info("Executor plugin updated") - }, - DeleteFunc: func(obj interface{}) { - key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - namespace, name, _ := cache.SplitMetaNamespaceKey(key) - delete(wfc.executorPlugins[namespace], name) - log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") - }, + + } else if wfc.isControllerCM(cm) { + log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) + wfc.UpdateConfig(ctx) + } + + if wfc.isFromManagedNamespace(cm) { + wfc.notifySemaphoreConfigUpdate(cm) + } }, - }) - if err != nil { - return nil, err - } + DeleteFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + if !wfc.isPluginCM(cm) { + return + } + + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + namespace, name, _ := cache.SplitMetaNamespaceKey(key) + delete(wfc.executorPlugins[namespace], name) + log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") + }, + }, + }) + + if err != nil { + return nil, err } return indexInformer, nil + } // call this func whenever the configuration changes, or when the workflow informer changes @@ -1506,3 +1555,31 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr } return informer, nil } + +func (wfc *WorkflowController) isManagedNamespaceDifferent() bool { + return wfc.GetNamespace() != wfc.GetManagedNamespace() +} + +func (wfc *WorkflowController) GetNamespace() string { + return wfc.Config.Namespace +} + +func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { + return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin +} + +func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { + return cm.GetName() == wfc.configController.GetName() +} + +func (wfc *WorkflowController) isFromManagedNamespace(cm metav1.Object) bool { + return cm.GetNamespace() == wfc.GetManagedNamespace() +} + +func (wfc *WorkflowController) isConfigMapInformerMgmtSynced() bool { + if wfc.isManagedNamespaceDifferent() { + return wfc.configMapInformerMgmt.HasSynced() + } + + return true +} diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 7522d368918a..78f03bd57a42 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,7 +322,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.configMapInformer, _ = wfc.newConfigMapInformer() + wfc.configMapInformerMgmt, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) + wfc.configMapInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace()) wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From 9006211fccf9d985c3050e34200585f7fb0fb475 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 19:47:13 -0400 Subject: [PATCH 02/33] refactor out an applyPluginCM func Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 67 ++++++++++++------------------- 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 77e75427d9cb..f4d18b5dfd49 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1305,57 +1305,19 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde return false } - //return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin - if wfc.isPluginCM(cm) { - log.WithField("executorPlugins", wfc.executorPlugins != nil). - Info("Plugins") - } return wfc.isPluginCM(cm) || wfc.isControllerCM(cm) || wfc.isFromManagedNamespace(cm) }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { cm := obj.(*apiv1.ConfigMap) - - if !wfc.isPluginCM(cm) { - return - } - - p, err := plugin.FromConfigMap(cm) - if err != nil { - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - WithError(err). - Error("failed to convert configmap to plugin") - return - } - if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { - wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} - } - wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - Info("Executor plugin added") + wfc.applyPluginCM(cm, "added") }, UpdateFunc: func(_, obj interface{}) { cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "updated") - if wfc.isPluginCM(cm) { - p, err := plugin.FromConfigMap(cm) - if err != nil { - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - WithError(err). - Error("failed to convert configmap to plugin") - return - } - - wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - Info("Executor plugin updated") - - } else if wfc.isControllerCM(cm) { + if wfc.isControllerCM(cm) { log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfc.UpdateConfig(ctx) } @@ -1366,7 +1328,6 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde }, DeleteFunc: func(obj interface{}) { cm := obj.(*apiv1.ConfigMap) - if !wfc.isPluginCM(cm) { return } @@ -1411,6 +1372,28 @@ func (wfc *WorkflowController) GetManagedNamespace() string { return wfc.Config.Namespace } +func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { + if !wfc.isPluginCM(cm) { + return + } + + p, err := plugin.FromConfigMap(cm) + if err != nil { + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + WithError(err). + Error("failed to convert configmap to plugin") + return + } + if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { + wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} + } + wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + Infof("Executor plugin %s", verb) + } + func (wfc *WorkflowController) getMaxStackDepth() int { return maxAllowedStackDepth } From dbb84d7a3fbd3362c652da30a8cbd96053a69c02 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 19:59:22 -0400 Subject: [PATCH 03/33] renames Signed-off-by: Anton Gilgur --- workflow/controller/cache_gc.go | 2 +- workflow/controller/controller.go | 29 +++++++++++++------------- workflow/controller/controller_test.go | 4 ++-- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/workflow/controller/cache_gc.go b/workflow/controller/cache_gc.go index 5d641c7407ec..d56032588a72 100644 --- a/workflow/controller/cache_gc.go +++ b/workflow/controller/cache_gc.go @@ -25,7 +25,7 @@ func init() { // syncAllCacheForGC syncs all cache for GC func (wfc *WorkflowController) syncAllCacheForGC(ctx context.Context) { - configMaps, err := wfc.configMapInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) + configMaps, err := wfc.cmInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) if err != nil { log.WithError(err).Error("Failed to get configmaps from informer") return diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index f4d18b5dfd49..c8e34416ce4e 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -124,8 +124,8 @@ type WorkflowController struct { wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer - configMapInformer cache.SharedIndexInformer - configMapInformerMgmt cache.SharedIndexInformer + cmInformer cache.SharedIndexInformer + cmInformerManaged cache.SharedIndexInformer wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy throttler sync.Throttler @@ -321,13 +321,13 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.configMapInformer, err = wfc.newConfigMapInformer(wfc.GetNamespace()) + wfc.cmInformer, err = wfc.newConfigMapInformer(wfc.GetNamespace()) if err != nil { log.Fatal(err) } if wfc.isManagedNamespaceDifferent() { - wfc.configMapInformerMgmt, err = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) + wfc.cmInformerManaged, err = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) if err != nil { log.Fatal(err) } @@ -347,10 +347,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.wfInformer.Run(ctx.Done()) go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) - go wfc.configMapInformer.Run(ctx.Done()) - + go wfc.cmInformer.Run(ctx.Done()) if wfc.isManagedNamespaceDifferent() { - go wfc.configMapInformerMgmt.Run(ctx.Done()) + go wfc.cmInformerManaged.Run(ctx.Done()) } go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) @@ -364,8 +363,8 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wfInformer.HasSynced, wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced, - wfc.configMapInformer.HasSynced, - wfc.isConfigMapInformerMgmtSynced, + wfc.cmInformer.HasSynced, + wfc.isCMInformerManagedSynced, wfc.wfTaskSetInformer.Informer().HasSynced, wfc.artGCTaskInformer.Informer().HasSynced, wfc.taskResultInformer.HasSynced, @@ -1306,7 +1305,7 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde return false } - return wfc.isPluginCM(cm) || wfc.isControllerCM(cm) || wfc.isFromManagedNamespace(cm) + return wfc.isPluginCM(cm) || wfc.isControllerCM(cm) || wfc.isManagedNamespaceCM(cm) }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -1322,7 +1321,7 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde wfc.UpdateConfig(ctx) } - if wfc.isFromManagedNamespace(cm) { + if wfc.isManagedNamespaceCM(cm) { wfc.notifySemaphoreConfigUpdate(cm) } }, @@ -1392,7 +1391,7 @@ func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { log.WithField("namespace", cm.GetNamespace()). WithField("name", cm.GetName()). Infof("Executor plugin %s", verb) - } +} func (wfc *WorkflowController) getMaxStackDepth() int { return maxAllowedStackDepth @@ -1555,13 +1554,13 @@ func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { return cm.GetName() == wfc.configController.GetName() } -func (wfc *WorkflowController) isFromManagedNamespace(cm metav1.Object) bool { +func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { return cm.GetNamespace() == wfc.GetManagedNamespace() } -func (wfc *WorkflowController) isConfigMapInformerMgmtSynced() bool { +func (wfc *WorkflowController) isCMInformerManagedSynced() bool { if wfc.isManagedNamespaceDifferent() { - return wfc.configMapInformerMgmt.HasSynced() + return wfc.cmInformerManaged.HasSynced() } return true diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 78f03bd57a42..3abdcd030755 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,8 +322,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.configMapInformerMgmt, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) - wfc.configMapInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace()) + wfc.cmInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace()) + wfc.cmInformerManaged, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From cd91b70f5b0d11bb2b5c1b593b09858523a07443 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:02:03 -0400 Subject: [PATCH 04/33] move some functions around Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 34 +++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index c8e34416ce4e..23a49a1b337c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1364,11 +1364,27 @@ func (wfc *WorkflowController) setWorkflowDefaults(wf *wfv1.Workflow) error { return nil } +func (wfc *WorkflowController) GetNamespace() string { + return wfc.Config.Namespace +} + func (wfc *WorkflowController) GetManagedNamespace() string { if wfc.managedNamespace != "" { return wfc.managedNamespace } - return wfc.Config.Namespace + return wfc.GetNamespace() +} + +func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { + return cm.GetNamespace() == wfc.GetManagedNamespace() +} + +func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { + return cm.GetName() == wfc.configController.GetName() +} + +func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { + return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin } func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { @@ -1542,22 +1558,6 @@ func (wfc *WorkflowController) isManagedNamespaceDifferent() bool { return wfc.GetNamespace() != wfc.GetManagedNamespace() } -func (wfc *WorkflowController) GetNamespace() string { - return wfc.Config.Namespace -} - -func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { - return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin -} - -func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { - return cm.GetName() == wfc.configController.GetName() -} - -func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { - return cm.GetNamespace() == wfc.GetManagedNamespace() -} - func (wfc *WorkflowController) isCMInformerManagedSynced() bool { if wfc.isManagedNamespaceDifferent() { return wfc.cmInformerManaged.HasSynced() From 8096a0607dcd96642bb2a100bc385ccd217c9f1c Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:03:42 -0400 Subject: [PATCH 05/33] add ns checks Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 23a49a1b337c..8ad5ec69b2b0 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1380,11 +1380,11 @@ func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { } func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { - return cm.GetName() == wfc.configController.GetName() + return cm.GetName() == wfc.configController.GetName() && cm.GetNamespace() == wfc.GetNamespace() } func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { - return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin + return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin && wfc.isManagedNamespaceCM(cm) } func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { From 6babe0e1232b7683ecadd9b8a243f141082d0209 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:09:09 -0400 Subject: [PATCH 06/33] comments Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 8ad5ec69b2b0..a14697fddc0c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -124,8 +124,8 @@ type WorkflowController struct { wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer - cmInformer cache.SharedIndexInformer - cmInformerManaged cache.SharedIndexInformer + cmInformer cache.SharedIndexInformer // configmaps in own ns + cmInformerManaged cache.SharedIndexInformer // configmaps in managed ns wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy throttler sync.Throttler @@ -1300,7 +1300,7 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde cm, err := meta.Accessor(obj) if err != nil { log.WithError(err). - Error("failed to get configmap") + Error("failed to get configmap metadata") return false } From 1d4015d1e5ff376f7af5a429ce95b51d19ae1511 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:11:25 -0400 Subject: [PATCH 07/33] fix some more renames Signed-off-by: Anton Gilgur --- workflow/controller/dag.go | 2 +- workflow/controller/operator.go | 4 ++-- workflow/controller/workflowpod.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index fd60b5262eed..e393b6317574 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -439,7 +439,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex } } - processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) + processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { woc.markNodeError(node.Name, err) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 0e93b2a9b397..7bb3c6ba9fe5 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -639,7 +639,7 @@ func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Argument } for _, param := range executionParameters.Parameters { if param.ValueFrom != nil && param.ValueFrom.ConfigMapKeyRef != nil { - cmValue, err := common.GetConfigMapValue(woc.controller.configMapInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) + cmValue, err := common.GetConfigMapValue(woc.controller.cmInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) if err != nil { if param.ValueFrom.Default != nil { woc.globalParams["workflow.parameters."+param.Name] = param.ValueFrom.Default.String() @@ -1920,7 +1920,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } // Inputs has been processed with arguments already, so pass empty arguments. - processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) + processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 8a14a0170594..92e14799e303 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -364,7 +364,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin if tmpl.IsPodType() { localParams[common.LocalVarPodName] = pod.Name } - tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) + tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables") } From 7f9c0c5c83553b75850fe3b49f52caa0ddfe969b Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:16:34 -0400 Subject: [PATCH 08/33] also refactor out delete plugin for consistency Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index a14697fddc0c..6084a662cd3c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1327,14 +1327,7 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde }, DeleteFunc: func(obj interface{}) { cm := obj.(*apiv1.ConfigMap) - if !wfc.isPluginCM(cm) { - return - } - - key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - namespace, name, _ := cache.SplitMetaNamespaceKey(key) - delete(wfc.executorPlugins[namespace], name) - log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") + wfc.deletePluginCM(cm, obj) }, }, }) @@ -1409,6 +1402,17 @@ func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { Infof("Executor plugin %s", verb) } +func (wfc *WorkflowController) deletePluginCM(cm metav1.Object, obj interface{}) { + if !wfc.isPluginCM(cm) { + return + } + + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + namespace, name, _ := cache.SplitMetaNamespaceKey(key) + delete(wfc.executorPlugins[namespace], name) + log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") +} + func (wfc *WorkflowController) getMaxStackDepth() int { return maxAllowedStackDepth } From 59f23bcf3c1deff57d548b4b78108d8c5be4fd77 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:24:01 -0400 Subject: [PATCH 09/33] properly remove/replace runConfigMapWatcher Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 54 +++++--------------------- workflow/controller/controller_test.go | 1 + 2 files changed, 10 insertions(+), 45 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 6084a662cd3c..1e8623605098 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -37,7 +37,6 @@ import ( typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - apiwatch "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" @@ -340,10 +339,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo log.Fatal(err) } - if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" { - go wfc.runConfigMapWatcher(ctx.Done()) - } - go wfc.wfInformer.Run(ctx.Done()) go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) @@ -455,40 +450,6 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { return nil } -func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) { - defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) - - ctx := context.Background() - retryWatcher, err := apiwatch.NewRetryWatcher("1", &cache.ListWatch{ - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return wfc.kubeclientset.CoreV1().ConfigMaps(wfc.managedNamespace).Watch(ctx, metav1.ListOptions{}) - }, - }) - if err != nil { - panic(err) - } - defer retryWatcher.Stop() - - for { - select { - case event := <-retryWatcher.ResultChan(): - cm, ok := event.Object.(*apiv1.ConfigMap) - if !ok { - log.Errorf("invalid config map object received in config watcher. Ignored processing") - continue - } - log.Debugf("received config map %s/%s update", cm.Namespace, cm.Name) - if cm.GetName() == wfc.configController.GetName() && wfc.namespace == cm.GetNamespace() { - log.Infof("Received Workflow Controller config map %s/%s update", cm.Namespace, cm.Name) - wfc.UpdateConfig(ctx) - } - wfc.notifySemaphoreConfigUpdate(cm) - case <-stopCh: - return - } - } -} - // notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) { wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)) @@ -1316,13 +1277,16 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde cm := obj.(*apiv1.ConfigMap) wfc.applyPluginCM(cm, "updated") - if wfc.isControllerCM(cm) { - log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) - wfc.UpdateConfig(ctx) - } + if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" { + log.Debugf("received config map %s/%s update", cm.GetNamespace(), cm.GetName()) + if wfc.isControllerCM(cm) { + log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) + wfc.UpdateConfig(ctx) + } - if wfc.isManagedNamespaceCM(cm) { - wfc.notifySemaphoreConfigUpdate(cm) + if wfc.isManagedNamespaceCM(cm) { + wfc.notifySemaphoreConfigUpdate(cm) + } } }, DeleteFunc: func(obj interface{}) { diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 3abdcd030755..7c5a1cf2194d 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -324,6 +324,7 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.podInformer, _ = wfc.newPodInformer(ctx) wfc.cmInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace()) wfc.cmInformerManaged, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) + wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From 545826f006b82a43779fd7b930c6af7296058112 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:28:30 -0400 Subject: [PATCH 10/33] typings Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 1e8623605098..12e2bbb38ec8 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1332,19 +1332,19 @@ func (wfc *WorkflowController) GetManagedNamespace() string { return wfc.GetNamespace() } -func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { +func (wfc *WorkflowController) isManagedNamespaceCM(cm *apiv1.ConfigMap) bool { return cm.GetNamespace() == wfc.GetManagedNamespace() } -func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { +func (wfc *WorkflowController) isControllerCM(cm *apiv1.ConfigMap) bool { return cm.GetName() == wfc.configController.GetName() && cm.GetNamespace() == wfc.GetNamespace() } -func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { +func (wfc *WorkflowController) isPluginCM(cm *apiv1.ConfigMap) bool { return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin && wfc.isManagedNamespaceCM(cm) } -func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { +func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) { if !wfc.isPluginCM(cm) { return } @@ -1366,7 +1366,7 @@ func (wfc *WorkflowController) applyPluginCM(cm metav1.Object, verb string) { Infof("Executor plugin %s", verb) } -func (wfc *WorkflowController) deletePluginCM(cm metav1.Object, obj interface{}) { +func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap, obj interface{}) { if !wfc.isPluginCM(cm) { return } From 963158a07b9ee4bfdfcc6b2893dc88d6221cc03d Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 20:31:33 -0400 Subject: [PATCH 11/33] woops not all those typings Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 12e2bbb38ec8..0f34a0e3e009 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1332,15 +1332,15 @@ func (wfc *WorkflowController) GetManagedNamespace() string { return wfc.GetNamespace() } -func (wfc *WorkflowController) isManagedNamespaceCM(cm *apiv1.ConfigMap) bool { +func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { return cm.GetNamespace() == wfc.GetManagedNamespace() } -func (wfc *WorkflowController) isControllerCM(cm *apiv1.ConfigMap) bool { +func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { return cm.GetName() == wfc.configController.GetName() && cm.GetNamespace() == wfc.GetNamespace() } -func (wfc *WorkflowController) isPluginCM(cm *apiv1.ConfigMap) bool { +func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin && wfc.isManagedNamespaceCM(cm) } From 43527e9c8073751cbd818b7bc48b7f14d90f5840 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 22:08:47 -0400 Subject: [PATCH 12/33] use two configmap informers, one for managed ns, other for self ns - they can overlap in one case, but this is fine - we don't need a `FilteringResourceEventHandler` anymore: - for managed, we don't filter them, it's all configmaps in the ns - could potentially split two event handlers though? - for self, we have a field selector on the informer itself (which I believe is (much) more efficient) - we no longer need the `isManagedNamespaceCM`, `isControllerCM` funcs - nor `isManagedNamespaceDifferent`, `isCMInformerManagedSynced` - the `WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS` flag is not very optimized, we could technically skip the entire self configmap informer with that now - but it may be removed as it was a workaround, so don't really want to add too much code for it - also if we add any other configmaps in the controller ns, we wouldn't have to change as much - also properly pass `context` for the `UpdateConfig` call Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 122 ++++++++++++------------------ 1 file changed, 48 insertions(+), 74 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 0f34a0e3e009..ba14ecf7325b 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -12,8 +12,6 @@ import ( "github.com/upper/db/v4" - "github.com/argoproj/argo-workflows/v3" - "github.com/argoproj/pkg/errors" syncpkg "github.com/argoproj/pkg/sync" log "github.com/sirupsen/logrus" @@ -21,9 +19,9 @@ import ( "golang.org/x/time/rate" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" @@ -40,6 +38,7 @@ import ( "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" + "github.com/argoproj/argo-workflows/v3" "github.com/argoproj/argo-workflows/v3/config" argoErr "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/persist/sqldb" @@ -320,16 +319,13 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.cmInformer, err = wfc.newConfigMapInformer(wfc.GetNamespace()) + wfc.cmInformer, err = wfc.newConfigMapInformer(ctx) if err != nil { log.Fatal(err) } - - if wfc.isManagedNamespaceDifferent() { - wfc.cmInformerManaged, err = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) - if err != nil { - log.Fatal(err) - } + wfc.cmInformerManaged, err = wfc.newConfigMapInformerManaged() + if err != nil { + log.Fatal(err) } // Create Synchronization Manager @@ -343,9 +339,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.cmInformer.Run(ctx.Done()) - if wfc.isManagedNamespaceDifferent() { - go wfc.cmInformerManaged.Run(ctx.Done()) - } + go wfc.cmInformerManaged.Run(ctx.Done()) go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.artGCTaskInformer.Informer().Run(ctx.Done()) @@ -359,7 +353,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced, wfc.cmInformer.HasSynced, - wfc.isCMInformerManagedSynced, + wfc.cmInformerManaged.HasSynced, wfc.wfTaskSetInformer.Informer().HasSynced, wfc.artGCTaskInformer.Informer().HasSynced, wfc.taskResultInformer.HasSynced, @@ -1245,54 +1239,32 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } -func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedIndexInformer, error) { - indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, ns, 20*time.Minute, cache.Indexers{ +func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexInformer, error) { + indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, - }, func(opts *metav1.ListOptions) { - opts.LabelSelector = common.LabelKeyConfigMapType }) log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") - ctx := context.Background() - - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - cm, err := meta.Accessor(obj) - if err != nil { - log.WithError(err). - Error("failed to get configmap metadata") + _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "added") + }, + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "updated") - return false + if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") == "false" { + return } - return wfc.isPluginCM(cm) || wfc.isControllerCM(cm) || wfc.isManagedNamespaceCM(cm) + log.Debugf("received config map %s/%s update", cm.GetNamespace(), cm.GetName()) + wfc.notifySemaphoreConfigUpdate(cm) }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - wfc.applyPluginCM(cm, "added") - }, - UpdateFunc: func(_, obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - wfc.applyPluginCM(cm, "updated") - - if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" { - log.Debugf("received config map %s/%s update", cm.GetNamespace(), cm.GetName()) - if wfc.isControllerCM(cm) { - log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) - wfc.UpdateConfig(ctx) - } - - if wfc.isManagedNamespaceCM(cm) { - wfc.notifySemaphoreConfigUpdate(cm) - } - } - }, - DeleteFunc: func(obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - wfc.deletePluginCM(cm, obj) - }, + DeleteFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.deletePluginCM(cm, obj) }, }) @@ -1300,7 +1272,29 @@ func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedInde return nil, err } return indexInformer, nil +} + +func (wfc *WorkflowController) newConfigMapInformer(ctx context.Context) (cache.SharedIndexInformer, error) { + indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { + opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", wfc.configController.GetName()) // only the controller configmap + }) + + _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, obj interface{}) { + if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") == "false" { + return + } + cm := obj.(*apiv1.ConfigMap) + log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) + wfc.UpdateConfig(ctx) + }, + }) + + if err != nil { + return nil, err + } + return indexInformer, nil } // call this func whenever the configuration changes, or when the workflow informer changes @@ -1332,16 +1326,8 @@ func (wfc *WorkflowController) GetManagedNamespace() string { return wfc.GetNamespace() } -func (wfc *WorkflowController) isManagedNamespaceCM(cm metav1.Object) bool { - return cm.GetNamespace() == wfc.GetManagedNamespace() -} - -func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool { - return cm.GetName() == wfc.configController.GetName() && cm.GetNamespace() == wfc.GetNamespace() -} - func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { - return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin && wfc.isManagedNamespaceCM(cm) + return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin } func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) { @@ -1521,15 +1507,3 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr } return informer, nil } - -func (wfc *WorkflowController) isManagedNamespaceDifferent() bool { - return wfc.GetNamespace() != wfc.GetManagedNamespace() -} - -func (wfc *WorkflowController) isCMInformerManagedSynced() bool { - if wfc.isManagedNamespaceDifferent() { - return wfc.cmInformerManaged.HasSynced() - } - - return true -} From ecfdad2fabc2be4b955b6af95669ef2e7046cdb6 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 22:23:49 -0400 Subject: [PATCH 13/33] remove extra spaces Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 1 - workflow/controller/controller_test.go | 1 - 2 files changed, 2 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index ba14ecf7325b..50b00740bdd6 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -340,7 +340,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.podInformer.Run(ctx.Done()) go wfc.cmInformer.Run(ctx.Done()) go wfc.cmInformerManaged.Run(ctx.Done()) - go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.artGCTaskInformer.Informer().Run(ctx.Done()) go wfc.taskResultInformer.Run(ctx.Done()) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 7c5a1cf2194d..3abdcd030755 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -324,7 +324,6 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.podInformer, _ = wfc.newPodInformer(ctx) wfc.cmInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace()) wfc.cmInformerManaged, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) - wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From 1250b465ee770ab1c970cd790e047933d8960605 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 22:25:59 -0400 Subject: [PATCH 14/33] field selector to string Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 50b00740bdd6..8e23d115f4c0 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1275,7 +1275,7 @@ func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexI func (wfc *WorkflowController) newConfigMapInformer(ctx context.Context) (cache.SharedIndexInformer, error) { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { - opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", wfc.configController.GetName()) // only the controller configmap + opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", wfc.configController.GetName()).String() // only the controller configmap }) _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ From 5ca38e448592942d4e1bd7fc4bf91752cdf975c8 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 23:02:15 -0400 Subject: [PATCH 15/33] fix test init Signed-off-by: Anton Gilgur --- workflow/controller/controller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 3abdcd030755..1cc9f069a785 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,8 +322,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.cmInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace()) - wfc.cmInformerManaged, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace()) + wfc.cmInformer, _ = wfc.newConfigMapInformer(ctx) + wfc.cmInformerManaged, _ = wfc.newConfigMapInformerManaged() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From 909b8e9b9fd4f9dff74939515c97ab735a1c92dc Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 23:22:12 -0400 Subject: [PATCH 16/33] revert labelSelector removal? this seems to have broke some E2Es... hmmmm Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 8e23d115f4c0..36e7a2eecfac 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1239,8 +1239,10 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared } func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexInformer, error) { - indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ + indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, + }, func(opts *metav1.ListOptions) { + opts.LabelSelector = common.LabelKeyConfigMapType }) log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") From edd274c1325ef17c4d3b2495e72726411be442b2 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 23:26:24 -0400 Subject: [PATCH 17/33] Revert "revert labelSelector removal? this seems to have broke some E2Es... hmmmm" This reverts commit 909b8e9b9fd4f9dff74939515c97ab735a1c92dc. Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 36e7a2eecfac..8e23d115f4c0 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1239,10 +1239,8 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared } func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexInformer, error) { - indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ + indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, - }, func(opts *metav1.ListOptions) { - opts.LabelSelector = common.LabelKeyConfigMapType }) log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") From e683292701af33ebaf122ca6bb4254552812f9af Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 5 Apr 2024 23:27:36 -0400 Subject: [PATCH 18/33] ah wrong informer Signed-off-by: Anton Gilgur --- workflow/controller/cache_gc.go | 2 +- workflow/controller/dag.go | 2 +- workflow/controller/operator.go | 4 ++-- workflow/controller/workflowpod.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/workflow/controller/cache_gc.go b/workflow/controller/cache_gc.go index d56032588a72..2a08daf7393f 100644 --- a/workflow/controller/cache_gc.go +++ b/workflow/controller/cache_gc.go @@ -25,7 +25,7 @@ func init() { // syncAllCacheForGC syncs all cache for GC func (wfc *WorkflowController) syncAllCacheForGC(ctx context.Context) { - configMaps, err := wfc.cmInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) + configMaps, err := wfc.cmInformerManaged.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) if err != nil { log.WithError(err).Error("Failed to get configmaps from informer") return diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index e393b6317574..ede7810d173a 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -439,7 +439,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex } } - processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) + processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformerManaged.GetIndexer()) if err != nil { woc.markNodeError(node.Name, err) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 7bb3c6ba9fe5..232a2edb0c56 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -639,7 +639,7 @@ func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Argument } for _, param := range executionParameters.Parameters { if param.ValueFrom != nil && param.ValueFrom.ConfigMapKeyRef != nil { - cmValue, err := common.GetConfigMapValue(woc.controller.cmInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) + cmValue, err := common.GetConfigMapValue(woc.controller.cmInformerManaged.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) if err != nil { if param.ValueFrom.Default != nil { woc.globalParams["workflow.parameters."+param.Name] = param.ValueFrom.Default.String() @@ -1920,7 +1920,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } // Inputs has been processed with arguments already, so pass empty arguments. - processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) + processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformerManaged.GetIndexer()) if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 92e14799e303..1241946e62ab 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -364,7 +364,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin if tmpl.IsPodType() { localParams[common.LocalVarPodName] = pod.Name } - tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) + tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformerManaged.GetIndexer()) if err != nil { return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables") } From 9e749e2d88559bc7f228478a2bfbda0538a1f760 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Sun, 7 Apr 2024 22:15:50 -0400 Subject: [PATCH 19/33] cm is obj just with a typecast, use the same arg Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 8e23d115f4c0..7e8b5e8f5e11 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1263,7 +1263,7 @@ func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexI }, DeleteFunc: func(obj interface{}) { cm := obj.(*apiv1.ConfigMap) - wfc.deletePluginCM(cm, obj) + wfc.deletePluginCM(cm) }, }) @@ -1351,12 +1351,12 @@ func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) { Infof("Executor plugin %s", verb) } -func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap, obj interface{}) { +func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { if !wfc.isPluginCM(cm) { return } - key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(cm) namespace, name, _ := cache.SplitMetaNamespaceKey(key) delete(wfc.executorPlugins[namespace], name) log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") From 579823bf21d60767f41911b37bf23fe6608d6042 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 13:40:01 -0400 Subject: [PATCH 20/33] tiny refactor Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 7e8b5e8f5e11..287473868226 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -445,6 +445,7 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { // notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) { + log.Debugf("received semaphore config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)) if err != nil { log.Errorf("failed get the workflow from informer. %v", err) @@ -1238,6 +1239,8 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } +var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" + func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexInformer, error) { indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, @@ -1254,11 +1257,10 @@ func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexI cm := obj.(*apiv1.ConfigMap) wfc.applyPluginCM(cm, "updated") - if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") == "false" { + if !watchControllerSemaphoreConfigMaps { return } - log.Debugf("received config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfc.notifySemaphoreConfigUpdate(cm) }, DeleteFunc: func(obj interface{}) { @@ -1275,12 +1277,12 @@ func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexI func (wfc *WorkflowController) newConfigMapInformer(ctx context.Context) (cache.SharedIndexInformer, error) { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { - opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", wfc.configController.GetName()).String() // only the controller configmap + opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { - if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") == "false" { + if !watchControllerSemaphoreConfigMaps { return } From 700dd371d844145819a2ad59e38f37220e39abf6 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 15:01:33 -0400 Subject: [PATCH 21/33] rename again -- all other informers don't specify 'managed' despite working on the managed ns. specify ctrl for controller instead Signed-off-by: Anton Gilgur --- workflow/controller/cache_gc.go | 2 +- workflow/controller/controller.go | 14 +++++++------- workflow/controller/controller_test.go | 4 ++-- workflow/controller/dag.go | 2 +- workflow/controller/operator.go | 4 ++-- workflow/controller/workflowpod.go | 2 +- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/workflow/controller/cache_gc.go b/workflow/controller/cache_gc.go index 2a08daf7393f..d56032588a72 100644 --- a/workflow/controller/cache_gc.go +++ b/workflow/controller/cache_gc.go @@ -25,7 +25,7 @@ func init() { // syncAllCacheForGC syncs all cache for GC func (wfc *WorkflowController) syncAllCacheForGC(ctx context.Context) { - configMaps, err := wfc.cmInformerManaged.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) + configMaps, err := wfc.cmInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) if err != nil { log.WithError(err).Error("Failed to get configmaps from informer") return diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 287473868226..35d8a8197d7b 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -122,8 +122,8 @@ type WorkflowController struct { wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer - cmInformer cache.SharedIndexInformer // configmaps in own ns - cmInformerManaged cache.SharedIndexInformer // configmaps in managed ns + cmInformer cache.SharedIndexInformer // configmaps of plugins, parameters, memoizations, etc + cmCtrlInformer cache.SharedIndexInformer // controller's own configmap wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy throttler sync.Throttler @@ -319,11 +319,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.cmInformer, err = wfc.newConfigMapInformer(ctx) + wfc.cmInformer, err = wfc.newConfigMapInformer() if err != nil { log.Fatal(err) } - wfc.cmInformerManaged, err = wfc.newConfigMapInformerManaged() + wfc.cmCtrlInformer, err = wfc.newConfigMapCtrlInformer(ctx) if err != nil { log.Fatal(err) } @@ -339,7 +339,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.cmInformer.Run(ctx.Done()) - go wfc.cmInformerManaged.Run(ctx.Done()) + go wfc.cmCtrlInformer.Run(ctx.Done()) go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.artGCTaskInformer.Informer().Run(ctx.Done()) go wfc.taskResultInformer.Run(ctx.Done()) @@ -1241,7 +1241,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" -func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }) @@ -1275,7 +1275,7 @@ func (wfc *WorkflowController) newConfigMapInformerManaged() (cache.SharedIndexI return indexInformer, nil } -func (wfc *WorkflowController) newConfigMapInformer(ctx context.Context) (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapCtrlInformer(ctx context.Context) (cache.SharedIndexInformer, error) { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 1cc9f069a785..3c4023d1ce82 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,8 +322,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.cmInformer, _ = wfc.newConfigMapInformer(ctx) - wfc.cmInformerManaged, _ = wfc.newConfigMapInformerManaged() + wfc.cmInformer, _ = wfc.newConfigMapInformer() + wfc.cmCtrlInformer, _ = wfc.newConfigMapCtrlInformer(ctx) wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index ede7810d173a..e393b6317574 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -439,7 +439,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex } } - processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformerManaged.GetIndexer()) + processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { woc.markNodeError(node.Name, err) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 232a2edb0c56..7bb3c6ba9fe5 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -639,7 +639,7 @@ func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Argument } for _, param := range executionParameters.Parameters { if param.ValueFrom != nil && param.ValueFrom.ConfigMapKeyRef != nil { - cmValue, err := common.GetConfigMapValue(woc.controller.cmInformerManaged.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) + cmValue, err := common.GetConfigMapValue(woc.controller.cmInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) if err != nil { if param.ValueFrom.Default != nil { woc.globalParams["workflow.parameters."+param.Name] = param.ValueFrom.Default.String() @@ -1920,7 +1920,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } // Inputs has been processed with arguments already, so pass empty arguments. - processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformerManaged.GetIndexer()) + processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 1241946e62ab..92e14799e303 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -364,7 +364,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin if tmpl.IsPodType() { localParams[common.LocalVarPodName] = pod.Name } - tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformerManaged.GetIndexer()) + tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables") } From 91000ca41fa56a2f3c33240aeb3dbd894b3c07b3 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 15:03:13 -0400 Subject: [PATCH 22/33] tiny optimization Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 35d8a8197d7b..b436689c7a1b 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1280,12 +1280,12 @@ func (wfc *WorkflowController) newConfigMapCtrlInformer(ctx context.Context) (ca opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) + if !watchControllerSemaphoreConfigMaps { + return indexInformer, nil + } + _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { - if !watchControllerSemaphoreConfigMaps { - return - } - cm := obj.(*apiv1.ConfigMap) log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfc.UpdateConfig(ctx) From abea6773f222e7f995e9a9c84cdd8183927e8e9b Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 15:05:02 -0400 Subject: [PATCH 23/33] rename miss fix Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index b436689c7a1b..7b5b19f1bd35 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -352,7 +352,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced, wfc.cmInformer.HasSynced, - wfc.cmInformerManaged.HasSynced, + wfc.cmCtrlInformer.HasSynced, wfc.wfTaskSetInformer.Informer().HasSynced, wfc.artGCTaskInformer.Informer().HasSynced, wfc.taskResultInformer.HasSynced, From f8e0872544602a96fae155f166d49709a32b28c2 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 17:39:10 -0400 Subject: [PATCH 24/33] refactor out a third informer for semaphores, for better filtering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - this way the plugin etc informer is more efficient, only on configmaps with the label - and the diff is (much) smaller (once you ignore whitespace from the inverted `if`) - semaphore informer only needs to watch certain configmaps -- get those names from the index - this is pretty hacky, I don't like this, but every way to track this seemed hacky 😕 - this option had the least changes though - semaphore informer only needs name and namespace -- remove the rest from its cache Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 199 ++++++++++++------ workflow/controller/controller_test.go | 3 +- workflow/controller/indexes/workflow_index.go | 13 +- .../controller/indexes/workflow_index_test.go | 3 + 4 files changed, 149 insertions(+), 69 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 7b5b19f1bd35..669385d53931 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -19,6 +19,7 @@ import ( "golang.org/x/time/rate" apiv1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" @@ -122,8 +123,9 @@ type WorkflowController struct { wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer - cmInformer cache.SharedIndexInformer // configmaps of plugins, parameters, memoizations, etc - cmCtrlInformer cache.SharedIndexInformer // controller's own configmap + cmInformer cache.SharedIndexInformer // configmaps with common.LabelKeyConfigMapType: plugins, parameters, memoizations, etc + cmControllerInformer cache.SharedIndexInformer // controller's own configmap + cmSemaphoreInformer cache.SharedIndexInformer // semaphore configmaps wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy throttler sync.Throttler @@ -323,7 +325,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo if err != nil { log.Fatal(err) } - wfc.cmCtrlInformer, err = wfc.newConfigMapCtrlInformer(ctx) + wfc.cmControllerInformer, err = wfc.newConfigMapControllerInformer(ctx) + if err != nil { + log.Fatal(err) + } + wfc.cmSemaphoreInformer, err = wfc.newConfigMapSemaphoreInformer() if err != nil { log.Fatal(err) } @@ -339,7 +345,8 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.cmInformer.Run(ctx.Done()) - go wfc.cmCtrlInformer.Run(ctx.Done()) + go wfc.cmControllerInformer.Run(ctx.Done()) + go wfc.cmSemaphoreInformer.Run(ctx.Done()) go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.artGCTaskInformer.Informer().Run(ctx.Done()) go wfc.taskResultInformer.Run(ctx.Done()) @@ -352,7 +359,8 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced, wfc.cmInformer.HasSynced, - wfc.cmCtrlInformer.HasSynced, + wfc.cmControllerInformer.HasSynced, + wfc.cmSemaphoreInformer.HasSynced, wfc.wfTaskSetInformer.Informer().HasSynced, wfc.artGCTaskInformer.Informer().HasSynced, wfc.taskResultInformer.HasSynced, @@ -444,9 +452,11 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { } // notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows -func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) { - log.Debugf("received semaphore config map %s/%s update", cm.GetNamespace(), cm.GetName()) - wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)) +func (wfc *WorkflowController) notifySemaphoreConfigUpdate(ns string, name string) { + key := fmt.Sprintf("%s/%s", ns, name) + log.Debugf("received semaphore config map %s update", key) + + wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, key) if err != nil { log.Errorf("failed get the workflow from informer. %v", err) } @@ -1031,9 +1041,6 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // This function is called when an object is to be removed // from the informer DeleteFunc: func(obj interface{}) { - // IndexerInformer uses a delta queue, therefore for deletes we have to use this - // key function. - // Remove finalizers from Pods if they exist before deletion pods := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace()) podList, err := pods.List(ctx, metav1.ListOptions{ @@ -1048,6 +1055,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) } } + // IndexerInformer uses a delta queue, therefore for deletes we have to use this key function. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { wfc.releaseAllWorkflowLocks(obj) @@ -1239,33 +1247,42 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } -var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" - func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { - indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ + indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, + }, func(opts *metav1.ListOptions) { + opts.LabelSelector = common.LabelKeyConfigMapType // only configmaps with this label }) log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") + if wfc.executorPlugins == nil { + return indexInformer, nil + } - _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - wfc.applyPluginCM(cm, "added") - }, - UpdateFunc: func(_, obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - wfc.applyPluginCM(cm, "updated") - - if !watchControllerSemaphoreConfigMaps { - return + _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cmMeta, err := meta.Accessor(obj) + if err != nil { + log.WithError(err). + Error("failed to get configmap metadata") + return false } - wfc.notifySemaphoreConfigUpdate(cm) + return isPluginCM(cmMeta) }, - DeleteFunc: func(obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - wfc.deletePluginCM(cm) + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "added") + }, + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "updated") + }, + DeleteFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.deletePluginCM(cm) + }, }, }) @@ -1275,7 +1292,38 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer return indexInformer, nil } -func (wfc *WorkflowController) newConfigMapCtrlInformer(ctx context.Context) (cache.SharedIndexInformer, error) { +func isPluginCM(cmMeta metav1.Object) bool { + return cmMeta.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin +} + +func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) { + p, err := plugin.FromConfigMap(cm) + if err != nil { + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + WithError(err). + Error("failed to convert configmap to plugin") + return + } + if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { + wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} + } + wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + Infof("Executor plugin %s", verb) +} + +func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(cm) + namespace, name, _ := cache.SplitMetaNamespaceKey(key) + delete(wfc.executorPlugins[namespace], name) + log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") +} + +var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" + +func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) (cache.SharedIndexInformer, error) { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) @@ -1298,6 +1346,60 @@ func (wfc *WorkflowController) newConfigMapCtrlInformer(ctx context.Context) (ca return indexInformer, nil } +func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedIndexInformer, error) { + indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }).WithTransform(func(obj interface{}) (interface{}, error) { + cm, ok := obj.(*apiv1.ConfigMap) + if !ok { + return obj, nil + } + + // only leave name and namespace, remove the rest as we don't use it + cm = cm.DeepCopy() + cm.ObjectMeta = metav1.ObjectMeta{ + Name: cm.Name, + Namespace: cm.Namespace, + } + cm.Data = map[string]string{} + cm.BinaryData = map[string][]byte{} + return cm, nil + }) + + if !watchControllerSemaphoreConfigMaps { + return indexInformer, nil + } + + _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cmMeta, err := meta.Accessor(obj) + if err != nil { + log.WithError(err). + Error("failed to get configmap metadata") + return false + } + + return isSemaphoreCM(cmMeta.GetNamespace(), cmMeta.GetName()) + }, + Handler: cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.notifySemaphoreConfigUpdate(cm.GetNamespace(), cm.GetName()) + }, + }, + }) + + if err != nil { + return nil, err + } + return indexInformer, nil +} + +func isSemaphoreCM(ns string, name string) bool { + key := fmt.Sprintf("%s/%s", ns, name) + return indexes.HasSemaphoreKey(key) +} + // call this func whenever the configuration changes, or when the workflow informer changes func (wfc *WorkflowController) updateEstimatorFactory() { wfc.estimatorFactory = estimation.NewEstimatorFactory(wfc.wfInformer, wfc.hydrator, wfc.wfArchive) @@ -1327,43 +1429,6 @@ func (wfc *WorkflowController) GetManagedNamespace() string { return wfc.GetNamespace() } -func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool { - return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin -} - -func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) { - if !wfc.isPluginCM(cm) { - return - } - - p, err := plugin.FromConfigMap(cm) - if err != nil { - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - WithError(err). - Error("failed to convert configmap to plugin") - return - } - if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { - wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} - } - wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - Infof("Executor plugin %s", verb) -} - -func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { - if !wfc.isPluginCM(cm) { - return - } - - key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(cm) - namespace, name, _ := cache.SplitMetaNamespaceKey(key) - delete(wfc.executorPlugins[namespace], name) - log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") -} - func (wfc *WorkflowController) getMaxStackDepth() int { return maxAllowedStackDepth } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 3c4023d1ce82..2f896186ce20 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -323,7 +323,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) wfc.cmInformer, _ = wfc.newConfigMapInformer() - wfc.cmCtrlInformer, _ = wfc.newConfigMapCtrlInformer(ctx) + wfc.cmControllerInformer, _ = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer, _ = wfc.newConfigMapSemaphoreInformer() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) diff --git a/workflow/controller/indexes/workflow_index.go b/workflow/controller/indexes/workflow_index.go index 71ed973b367f..c6b2a315fb1e 100644 --- a/workflow/controller/indexes/workflow_index.go +++ b/workflow/controller/indexes/workflow_index.go @@ -14,6 +14,7 @@ import ( var ( indexWorkflowSemaphoreKeys = os.Getenv("INDEX_WORKFLOW_SEMAPHORE_KEYS") != "false" + semaphoreKeyMap = make(map[string]struct{}) // set of all semaphore keys ) func init() { @@ -70,6 +71,16 @@ func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc { if err != nil { return nil, nil } - return wf.GetSemaphoreKeys(), nil + keys := wf.GetSemaphoreKeys() + // store all keys while indexing as a side effect + for _, key := range keys { + semaphoreKeyMap[key] = struct{}{} + } + return keys, nil } } + +func HasSemaphoreKey(key string) bool { + _, ok := semaphoreKeyMap[key] + return ok +} diff --git a/workflow/controller/indexes/workflow_index_test.go b/workflow/controller/indexes/workflow_index_test.go index d1ea7128b022..e285795b5417 100644 --- a/workflow/controller/indexes/workflow_index_test.go +++ b/workflow/controller/indexes/workflow_index_test.go @@ -87,6 +87,7 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Len(t, result, 1) + assert.True(t, HasSemaphoreKey(result)) }) t.Run("Incomplete", func(t *testing.T) { un, _ := util.ToUnstructured(&wfv1.Workflow{ @@ -106,6 +107,7 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Len(t, result, 1) + assert.True(t, HasSemaphoreKey(result)) }) t.Run("Complete", func(t *testing.T) { un, _ := util.ToUnstructured(&wfv1.Workflow{ @@ -118,5 +120,6 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Nil(t, result) + assert.False(t, HasSemaphoreKey(result)) }) } From 8e1fc5dc9ba919ccb219fa453d5fb174aa561831 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 17:58:16 -0400 Subject: [PATCH 25/33] fix transform Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 669385d53931..564c93ea01a4 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1349,7 +1349,8 @@ func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Contex func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedIndexInformer, error) { indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - }).WithTransform(func(obj interface{}) (interface{}, error) { + }) + indexInformer.SharedInformer.SetTransform(func(obj interface{}) (interface{}, error) { cm, ok := obj.(*apiv1.ConfigMap) if !ok { return obj, nil From b96a18aa632efe487349f79b573acfede2e962e4 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 18:08:24 -0400 Subject: [PATCH 26/33] woops i read the type wrong Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 564c93ea01a4..fabf9d3aa3d6 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1350,7 +1350,7 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedInde indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) - indexInformer.SharedInformer.SetTransform(func(obj interface{}) (interface{}, error) { + indexInformer.SetTransform(func(obj interface{}) (interface{}, error) { cm, ok := obj.(*apiv1.ConfigMap) if !ok { return obj, nil From b3e32746bd914c0db362df32761953d1bb381791 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 18:17:29 -0400 Subject: [PATCH 27/33] we can remove some error handling as it will never happen - the errors [only occur](https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) when the informer was either already started or stopped - and we just created it, so we know that is not the case Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 49 ++++++++------------------ workflow/controller/controller_test.go | 6 ++-- 2 files changed, 17 insertions(+), 38 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index fabf9d3aa3d6..d881adc62d7c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -321,18 +321,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.cmInformer, err = wfc.newConfigMapInformer() - if err != nil { - log.Fatal(err) - } - wfc.cmControllerInformer, err = wfc.newConfigMapControllerInformer(ctx) - if err != nil { - log.Fatal(err) - } - wfc.cmSemaphoreInformer, err = wfc.newConfigMapSemaphoreInformer() - if err != nil { - log.Fatal(err) - } + wfc.cmInformer = wfc.newConfigMapInformer() + wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() // Create Synchronization Manager wfc.createSynchronizationManager(ctx) @@ -1247,7 +1238,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } -func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }, func(opts *metav1.ListOptions) { @@ -1256,10 +1247,10 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") if wfc.executorPlugins == nil { - return indexInformer, nil + return indexInformer } - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cmMeta, err := meta.Accessor(obj) if err != nil { @@ -1285,10 +1276,6 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer }, }, }) - - if err != nil { - return nil, err - } return indexInformer, nil } @@ -1323,30 +1310,26 @@ func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" -func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) cache.SharedIndexInformer { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) if !watchControllerSemaphoreConfigMaps { - return indexInformer, nil + return indexInformer } - _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { cm := obj.(*apiv1.ConfigMap) log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfc.UpdateConfig(ctx) }, }) - - if err != nil { - return nil, err - } - return indexInformer, nil + return indexInformer } -func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapSemaphoreInformer() cache.SharedIndexInformer { indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) @@ -1368,10 +1351,10 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedInde }) if !watchControllerSemaphoreConfigMaps { - return indexInformer, nil + return indexInformer } - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cmMeta, err := meta.Accessor(obj) if err != nil { @@ -1389,11 +1372,7 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedInde }, }, }) - - if err != nil { - return nil, err - } - return indexInformer, nil + return indexInformer } func isSemaphoreCM(ns string, name string) bool { diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 2f896186ce20..aa6d23c03e5a 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,9 +322,9 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.cmInformer, _ = wfc.newConfigMapInformer() - wfc.cmControllerInformer, _ = wfc.newConfigMapControllerInformer(ctx) - wfc.cmSemaphoreInformer, _ = wfc.newConfigMapSemaphoreInformer() + wfc.cmInformer = wfc.newConfigMapInformer() + wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From 303501a17c236d3c9e6ac34d2ba713c83817d962 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 18:25:09 -0400 Subject: [PATCH 28/33] Revert "we can remove some error handling as it will never happen" This reverts commit b3e32746bd914c0db362df32761953d1bb381791. Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 49 ++++++++++++++++++-------- workflow/controller/controller_test.go | 6 ++-- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index d881adc62d7c..fabf9d3aa3d6 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -321,9 +321,18 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.cmInformer = wfc.newConfigMapInformer() - wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) - wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() + wfc.cmInformer, err = wfc.newConfigMapInformer() + if err != nil { + log.Fatal(err) + } + wfc.cmControllerInformer, err = wfc.newConfigMapControllerInformer(ctx) + if err != nil { + log.Fatal(err) + } + wfc.cmSemaphoreInformer, err = wfc.newConfigMapSemaphoreInformer() + if err != nil { + log.Fatal(err) + } // Create Synchronization Manager wfc.createSynchronizationManager(ctx) @@ -1238,7 +1247,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } -func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer { +func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }, func(opts *metav1.ListOptions) { @@ -1247,10 +1256,10 @@ func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") if wfc.executorPlugins == nil { - return indexInformer + return indexInformer, nil } - indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cmMeta, err := meta.Accessor(obj) if err != nil { @@ -1276,6 +1285,10 @@ func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer }, }, }) + + if err != nil { + return nil, err + } return indexInformer, nil } @@ -1310,26 +1323,30 @@ func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" -func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) cache.SharedIndexInformer { +func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) (cache.SharedIndexInformer, error) { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) if !watchControllerSemaphoreConfigMaps { - return indexInformer + return indexInformer, nil } - indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { cm := obj.(*apiv1.ConfigMap) log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfc.UpdateConfig(ctx) }, }) - return indexInformer + + if err != nil { + return nil, err + } + return indexInformer, nil } -func (wfc *WorkflowController) newConfigMapSemaphoreInformer() cache.SharedIndexInformer { +func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedIndexInformer, error) { indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) @@ -1351,10 +1368,10 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() cache.SharedIndex }) if !watchControllerSemaphoreConfigMaps { - return indexInformer + return indexInformer, nil } - indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cmMeta, err := meta.Accessor(obj) if err != nil { @@ -1372,7 +1389,11 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() cache.SharedIndex }, }, }) - return indexInformer + + if err != nil { + return nil, err + } + return indexInformer, nil } func isSemaphoreCM(ns string, name string) bool { diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index aa6d23c03e5a..2f896186ce20 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,9 +322,9 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.cmInformer = wfc.newConfigMapInformer() - wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) - wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() + wfc.cmInformer, _ = wfc.newConfigMapInformer() + wfc.cmControllerInformer, _ = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer, _ = wfc.newConfigMapSemaphoreInformer() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From c68523434e4c30e154860a0322893509c234efeb Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 18:30:34 -0400 Subject: [PATCH 29/33] fix unit tests Signed-off-by: Anton Gilgur --- workflow/controller/controller_test.go | 7 ++----- workflow/controller/indexes/workflow_index_test.go | 5 ++--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 2f896186ce20..0ac90a049abc 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -929,10 +929,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { cancel, controller := newController(wf, wf1, wf2) defer cancel() - cm := apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{ - Name: "my-config", - Namespace: "default", - }} + cmNamespace, cmName := "default", "my-config" assert.Equal(3, controller.wfQueue.Len()) // Remove all Wf from Worker queue @@ -942,7 +939,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { } assert.Equal(0, controller.wfQueue.Len()) - controller.notifySemaphoreConfigUpdate(&cm) + controller.notifySemaphoreConfigUpdate(cmNamespace, cmName) time.Sleep(2 * time.Second) assert.Equal(2, controller.wfQueue.Len()) } diff --git a/workflow/controller/indexes/workflow_index_test.go b/workflow/controller/indexes/workflow_index_test.go index e285795b5417..4c3138ee5203 100644 --- a/workflow/controller/indexes/workflow_index_test.go +++ b/workflow/controller/indexes/workflow_index_test.go @@ -87,7 +87,7 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Len(t, result, 1) - assert.True(t, HasSemaphoreKey(result)) + assert.True(t, HasSemaphoreKey(result[0])) }) t.Run("Incomplete", func(t *testing.T) { un, _ := util.ToUnstructured(&wfv1.Workflow{ @@ -107,7 +107,7 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Len(t, result, 1) - assert.True(t, HasSemaphoreKey(result)) + assert.True(t, HasSemaphoreKey(result[0])) }) t.Run("Complete", func(t *testing.T) { un, _ := util.ToUnstructured(&wfv1.Workflow{ @@ -120,6 +120,5 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Nil(t, result) - assert.False(t, HasSemaphoreKey(result)) }) } From 91d0fff5f15a8cb1e1980e71794c6d5c92d887f7 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 19:20:49 -0400 Subject: [PATCH 30/33] ignore the lint explicitly Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index fabf9d3aa3d6..2b3f1469f478 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1350,6 +1350,7 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedInde indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) + //nolint:errcheck // the error only happens if the informer has already started, which hasn't happened yet (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L446) indexInformer.SetTransform(func(obj interface{}) (interface{}, error) { cm, ok := obj.(*apiv1.ConfigMap) if !ok { From efb6b9295862eb93261fceebe494267f675cf306 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Mon, 8 Apr 2024 19:29:34 -0400 Subject: [PATCH 31/33] remove some other unnecessary error handlers Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 54 +++++++++----------------- workflow/controller/controller_test.go | 6 +-- 2 files changed, 21 insertions(+), 39 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 2b3f1469f478..dad5512ecf49 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -321,18 +321,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo } wfc.updateEstimatorFactory() - wfc.cmInformer, err = wfc.newConfigMapInformer() - if err != nil { - log.Fatal(err) - } - wfc.cmControllerInformer, err = wfc.newConfigMapControllerInformer(ctx) - if err != nil { - log.Fatal(err) - } - wfc.cmSemaphoreInformer, err = wfc.newConfigMapSemaphoreInformer() - if err != nil { - log.Fatal(err) - } + wfc.cmInformer = wfc.newConfigMapInformer() + wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() // Create Synchronization Manager wfc.createSynchronizationManager(ctx) @@ -1247,7 +1238,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared return informer, nil } -func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }, func(opts *metav1.ListOptions) { @@ -1256,10 +1247,11 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") if wfc.executorPlugins == nil { - return indexInformer, nil + return indexInformer } - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cmMeta, err := meta.Accessor(obj) if err != nil { @@ -1285,11 +1277,7 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer }, }, }) - - if err != nil { - return nil, err - } - return indexInformer, nil + return indexInformer } func isPluginCM(cmMeta metav1.Object) bool { @@ -1323,30 +1311,27 @@ func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" -func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) cache.SharedIndexInformer { indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap }) if !watchControllerSemaphoreConfigMaps { - return indexInformer, nil + return indexInformer } - _, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { cm := obj.(*apiv1.ConfigMap) log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) wfc.UpdateConfig(ctx) }, }) - - if err != nil { - return nil, err - } - return indexInformer, nil + return indexInformer } -func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedIndexInformer, error) { +func (wfc *WorkflowController) newConfigMapSemaphoreInformer() cache.SharedIndexInformer { indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, }) @@ -1369,10 +1354,11 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedInde }) if !watchControllerSemaphoreConfigMaps { - return indexInformer, nil + return indexInformer } - _, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cmMeta, err := meta.Accessor(obj) if err != nil { @@ -1390,11 +1376,7 @@ func (wfc *WorkflowController) newConfigMapSemaphoreInformer() (cache.SharedInde }, }, }) - - if err != nil { - return nil, err - } - return indexInformer, nil + return indexInformer } func isSemaphoreCM(ns string, name string) bool { diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 0ac90a049abc..340812d306b8 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -322,9 +322,9 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) wfc.podInformer, _ = wfc.newPodInformer(ctx) - wfc.cmInformer, _ = wfc.newConfigMapInformer() - wfc.cmControllerInformer, _ = wfc.newConfigMapControllerInformer(ctx) - wfc.cmSemaphoreInformer, _ = wfc.newConfigMapSemaphoreInformer() + wfc.cmInformer = wfc.newConfigMapInformer() + wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) From 2f0f612ef7fce33a904153eca4a996994ae68e4b Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Wed, 10 Apr 2024 15:28:47 -0400 Subject: [PATCH 32/33] fix wfc.Config.Namespace issue and add clarifying comment Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index dad5512ecf49..c8f02b097d4f 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1403,14 +1403,14 @@ func (wfc *WorkflowController) setWorkflowDefaults(wf *wfv1.Workflow) error { } func (wfc *WorkflowController) GetNamespace() string { - return wfc.Config.Namespace + return wfc.namespace } func (wfc *WorkflowController) GetManagedNamespace() string { if wfc.managedNamespace != "" { return wfc.managedNamespace } - return wfc.GetNamespace() + return wfc.Config.Namespace // if not set, then empty string = cluster-scope. if set, then limit the watch } func (wfc *WorkflowController) getMaxStackDepth() int { From 05ee735a985473e4f15b1fc642f6ff96571aa7c5 Mon Sep 17 00:00:00 2001 From: Anton Gilgur Date: Fri, 19 Apr 2024 11:53:50 -0400 Subject: [PATCH 33/33] add code comment about env var Signed-off-by: Anton Gilgur --- workflow/controller/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index c8f02b097d4f..091d3b018bde 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -1309,6 +1309,7 @@ func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") } +// Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) cache.SharedIndexInformer {