Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor runConfigMapWatcher to use Informers. Fixes #11657 #11855

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c4e2d4f
fix: Refactor the func newConfigMapInformer to watch both main and mg…
juranir Apr 2, 2024
ba5eb1f
Merge branch 'argoproj:main' into master
juranir Apr 2, 2024
7295d59
Merge branch 'main' into master
juranir Apr 2, 2024
9006211
refactor out an applyPluginCM func
agilgur5 Apr 5, 2024
dbb84d7
renames
agilgur5 Apr 5, 2024
cd91b70
move some functions around
agilgur5 Apr 6, 2024
8096a06
add ns checks
agilgur5 Apr 6, 2024
6babe0e
comments
agilgur5 Apr 6, 2024
1d4015d
fix some more renames
agilgur5 Apr 6, 2024
7f9c0c5
also refactor out delete plugin for consistency
agilgur5 Apr 6, 2024
59f23bc
properly remove/replace runConfigMapWatcher
agilgur5 Apr 6, 2024
545826f
typings
agilgur5 Apr 6, 2024
963158a
woops not all those typings
agilgur5 Apr 6, 2024
43527e9
use two configmap informers, one for managed ns, other for self ns
agilgur5 Apr 6, 2024
ecfdad2
remove extra spaces
agilgur5 Apr 6, 2024
1250b46
field selector to string
agilgur5 Apr 6, 2024
5ca38e4
fix test init
agilgur5 Apr 6, 2024
909b8e9
revert labelSelector removal? this seems to have broke some E2Es... h…
agilgur5 Apr 6, 2024
edd274c
Revert "revert labelSelector removal? this seems to have broke some E…
agilgur5 Apr 6, 2024
e683292
ah wrong informer
agilgur5 Apr 6, 2024
9e749e2
cm is obj just with a typecast, use the same arg
agilgur5 Apr 8, 2024
579823b
tiny refactor
agilgur5 Apr 8, 2024
700dd37
rename again -- all other informers don't specify 'managed' despite w…
agilgur5 Apr 8, 2024
91000ca
tiny optimization
agilgur5 Apr 8, 2024
abea677
rename miss fix
agilgur5 Apr 8, 2024
f8e0872
refactor out a third informer for semaphores, for better filtering
agilgur5 Apr 8, 2024
8e1fc5d
fix transform
agilgur5 Apr 8, 2024
b96a18a
woops i read the type wrong
agilgur5 Apr 8, 2024
b3e3274
we can remove some error handling as it will never happen
agilgur5 Apr 8, 2024
303501a
Revert "we can remove some error handling as it will never happen"
agilgur5 Apr 8, 2024
c685234
fix unit tests
agilgur5 Apr 8, 2024
91d0fff
ignore the lint explicitly
agilgur5 Apr 8, 2024
efb6b92
remove some other unnecessary error handlers
agilgur5 Apr 8, 2024
2f0f612
fix wfc.Config.Namespace issue and add clarifying comment
agilgur5 Apr 10, 2024
05ee735
add code comment about env var
agilgur5 Apr 19, 2024
8f97dcb
Merge branch 'main' into fix-configmap-informer
agilgur5 Apr 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion workflow/controller/cache_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func init() {

// syncAllCacheForGC syncs all cache for GC
func (wfc *WorkflowController) syncAllCacheForGC(ctx context.Context) {
configMaps, err := wfc.configMapInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache)
configMaps, err := wfc.cmInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache)
if err != nil {
log.WithError(err).Error("Failed to get configmaps from informer")
return
Expand Down
206 changes: 104 additions & 102 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"golang.org/x/time/rate"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
Expand All @@ -35,7 +35,6 @@ import (
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
apiwatch "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"

Expand Down Expand Up @@ -123,7 +122,8 @@ type WorkflowController struct {
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
podInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
cmInformer cache.SharedIndexInformer // configmaps of plugins, parameters, memoizations, etc
cmCtrlInformer cache.SharedIndexInformer // controller's own configmap
wfQueue workqueue.RateLimitingInterface
podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy
throttler sync.Throttler
Expand Down Expand Up @@ -319,7 +319,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
}
wfc.updateEstimatorFactory()

wfc.configMapInformer, err = wfc.newConfigMapInformer()
wfc.cmInformer, err = wfc.newConfigMapInformer()
if err != nil {
log.Fatal(err)
}
wfc.cmCtrlInformer, err = wfc.newConfigMapCtrlInformer(ctx)
if err != nil {
log.Fatal(err)
}
Expand All @@ -331,14 +335,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
log.Fatal(err)
}

if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" {
go wfc.runConfigMapWatcher(ctx.Done())
}

go wfc.wfInformer.Run(ctx.Done())
go wfc.wftmplInformer.Informer().Run(ctx.Done())
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
go wfc.podInformer.Run(ctx.Done())
go wfc.configMapInformer.Run(ctx.Done())
go wfc.cmInformer.Run(ctx.Done())
go wfc.cmCtrlInformer.Run(ctx.Done())
go wfc.wfTaskSetInformer.Informer().Run(ctx.Done())
go wfc.artGCTaskInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Run(ctx.Done())
Expand All @@ -350,7 +351,8 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
wfc.wfInformer.HasSynced,
wfc.wftmplInformer.Informer().HasSynced,
wfc.podInformer.HasSynced,
wfc.configMapInformer.HasSynced,
wfc.cmInformer.HasSynced,
wfc.cmCtrlInformer.HasSynced,
wfc.wfTaskSetInformer.Informer().HasSynced,
wfc.artGCTaskInformer.Informer().HasSynced,
wfc.taskResultInformer.HasSynced,
Expand Down Expand Up @@ -441,42 +443,9 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error {
return nil
}

func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

ctx := context.Background()
retryWatcher, err := apiwatch.NewRetryWatcher("1", &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return wfc.kubeclientset.CoreV1().ConfigMaps(wfc.managedNamespace).Watch(ctx, metav1.ListOptions{})
},
})
if err != nil {
panic(err)
}
defer retryWatcher.Stop()

for {
select {
case event := <-retryWatcher.ResultChan():
cm, ok := event.Object.(*apiv1.ConfigMap)
if !ok {
log.Errorf("invalid config map object received in config watcher. Ignored processing")
continue
}
log.Debugf("received config map %s/%s update", cm.Namespace, cm.Name)
if cm.GetName() == wfc.configController.GetName() && wfc.namespace == cm.GetNamespace() {
log.Infof("Received Workflow Controller config map %s/%s update", cm.Namespace, cm.Name)
wfc.UpdateConfig(ctx)
}
wfc.notifySemaphoreConfigUpdate(cm)
case <-stopCh:
return
}
}
}

// notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows
func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) {
log.Debugf("received semaphore config map %s/%s update", cm.GetNamespace(), cm.GetName())
wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name))
if err != nil {
log.Errorf("failed get the workflow from informer. %v", err)
Expand Down Expand Up @@ -1270,69 +1239,61 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared
return informer, nil
}

var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false"

func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) {
indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{
indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{
juranir marked this conversation as resolved.
Show resolved Hide resolved
indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc,
juranir marked this conversation as resolved.
Show resolved Hide resolved
}, func(opts *metav1.ListOptions) {
opts.LabelSelector = common.LabelKeyConfigMapType
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
})

log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins")
if wfc.executorPlugins != nil {
_, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
cm, err := meta.Accessor(obj)
if err != nil {
return false
}
return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
p, err := plugin.FromConfigMap(cm)
if err != nil {
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
WithError(err).
Error("failed to convert configmap to plugin")
return
}
if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok {
wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{}
}
wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
Info("Executor plugin added")
},
UpdateFunc: func(_, obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
p, err := plugin.FromConfigMap(cm)
if err != nil {
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
WithError(err).
Error("failed to convert configmap to plugin")
return
}

wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
Info("Executor plugin updated")
},
DeleteFunc: func(obj interface{}) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
namespace, name, _ := cache.SplitMetaNamespaceKey(key)
delete(wfc.executorPlugins[namespace], name)
log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed")
},
},
})
if err != nil {
return nil, err
}
_, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
wfc.applyPluginCM(cm, "added")
},
UpdateFunc: func(_, obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
wfc.applyPluginCM(cm, "updated")

if !watchControllerSemaphoreConfigMaps {
return
}

wfc.notifySemaphoreConfigUpdate(cm)
},
DeleteFunc: func(obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
wfc.deletePluginCM(cm)
},
})

if err != nil {
return nil, err
}
return indexInformer, nil
}

func (wfc *WorkflowController) newConfigMapCtrlInformer(ctx context.Context) (cache.SharedIndexInformer, error) {
indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) {
opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap
})

agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
if !watchControllerSemaphoreConfigMaps {
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
return indexInformer, nil
}

_, err := indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(_, obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName())
wfc.UpdateConfig(ctx)
Copy link

@agilgur5 agilgur5 Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an optimization that can be done here as well -- we can just pass the configmap directly. UpdateConfig does a Get, which is not necessary here.

that requires a bit of refactoring, including in the config controller, and is independent of this PR (the optimization probably could have been done before this PR, in the RetryWatcher, as well), so I'm going to do that as a separate follow-up PR

},
})

if err != nil {
return nil, err
}
return indexInformer, nil
}
Expand All @@ -1355,11 +1316,52 @@ func (wfc *WorkflowController) setWorkflowDefaults(wf *wfv1.Workflow) error {
return nil
}

func (wfc *WorkflowController) GetNamespace() string {
return wfc.Config.Namespace
}

func (wfc *WorkflowController) GetManagedNamespace() string {
if wfc.managedNamespace != "" {
return wfc.managedNamespace
}
return wfc.Config.Namespace
return wfc.GetNamespace()
}

func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool {
return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin
}

func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) {
if !wfc.isPluginCM(cm) {
return
}

p, err := plugin.FromConfigMap(cm)
if err != nil {
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
WithError(err).
Error("failed to convert configmap to plugin")
return
}
if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok {
wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{}
}
wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
Infof("Executor plugin %s", verb)
}

func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) {
if !wfc.isPluginCM(cm) {
return
}

key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(cm)
namespace, name, _ := cache.SplitMetaNamespaceKey(key)
delete(wfc.executorPlugins[namespace], name)
log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed")
}

func (wfc *WorkflowController) getMaxStackDepth() int {
Expand Down
3 changes: 2 additions & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
_ = wfc.addWorkflowInformerHandlers(ctx)
wfc.podInformer, _ = wfc.newPodInformer(ctx)
wfc.configMapInformer, _ = wfc.newConfigMapInformer()
wfc.cmInformer, _ = wfc.newConfigMapInformer()
wfc.cmCtrlInformer, _ = wfc.newConfigMapCtrlInformer(ctx)
wfc.createSynchronizationManager(ctx)
_ = wfc.initManagers(ctx)

Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}
}

processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer())
if err != nil {
woc.markNodeError(node.Name, err)
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Argument
}
for _, param := range executionParameters.Parameters {
if param.ValueFrom != nil && param.ValueFrom.ConfigMapKeyRef != nil {
cmValue, err := common.GetConfigMapValue(woc.controller.configMapInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key)
cmValue, err := common.GetConfigMapValue(woc.controller.cmInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key)
if err != nil {
if param.ValueFrom.Default != nil {
woc.globalParams["workflow.parameters."+param.Name] = param.ValueFrom.Default.String()
Expand Down Expand Up @@ -1920,7 +1920,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
}

// Inputs has been processed with arguments already, so pass empty arguments.
processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer())
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err
}
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
if tmpl.IsPodType() {
localParams[common.LocalVarPodName] = pod.Name
}
tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer())
if err != nil {
return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables")
}
Expand Down
Loading