Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor runConfigMapWatcher to use Informers. Fixes #11657 #11855

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c4e2d4f
fix: Refactor the func newConfigMapInformer to watch both main and mg…
juranir Apr 2, 2024
ba5eb1f
Merge branch 'argoproj:main' into master
juranir Apr 2, 2024
7295d59
Merge branch 'main' into master
juranir Apr 2, 2024
9006211
refactor out an applyPluginCM func
agilgur5 Apr 5, 2024
dbb84d7
renames
agilgur5 Apr 5, 2024
cd91b70
move some functions around
agilgur5 Apr 6, 2024
8096a06
add ns checks
agilgur5 Apr 6, 2024
6babe0e
comments
agilgur5 Apr 6, 2024
1d4015d
fix some more renames
agilgur5 Apr 6, 2024
7f9c0c5
also refactor out delete plugin for consistency
agilgur5 Apr 6, 2024
59f23bc
properly remove/replace runConfigMapWatcher
agilgur5 Apr 6, 2024
545826f
typings
agilgur5 Apr 6, 2024
963158a
woops not all those typings
agilgur5 Apr 6, 2024
43527e9
use two configmap informers, one for managed ns, other for self ns
agilgur5 Apr 6, 2024
ecfdad2
remove extra spaces
agilgur5 Apr 6, 2024
1250b46
field selector to string
agilgur5 Apr 6, 2024
5ca38e4
fix test init
agilgur5 Apr 6, 2024
909b8e9
revert labelSelector removal? this seems to have broke some E2Es... h…
agilgur5 Apr 6, 2024
edd274c
Revert "revert labelSelector removal? this seems to have broke some E…
agilgur5 Apr 6, 2024
e683292
ah wrong informer
agilgur5 Apr 6, 2024
9e749e2
cm is obj just with a typecast, use the same arg
agilgur5 Apr 8, 2024
579823b
tiny refactor
agilgur5 Apr 8, 2024
700dd37
rename again -- all other informers don't specify 'managed' despite w…
agilgur5 Apr 8, 2024
91000ca
tiny optimization
agilgur5 Apr 8, 2024
abea677
rename miss fix
agilgur5 Apr 8, 2024
f8e0872
refactor out a third informer for semaphores, for better filtering
agilgur5 Apr 8, 2024
8e1fc5d
fix transform
agilgur5 Apr 8, 2024
b96a18a
woops i read the type wrong
agilgur5 Apr 8, 2024
b3e3274
we can remove some error handling as it will never happen
agilgur5 Apr 8, 2024
303501a
Revert "we can remove some error handling as it will never happen"
agilgur5 Apr 8, 2024
c685234
fix unit tests
agilgur5 Apr 8, 2024
91d0fff
ignore the lint explicitly
agilgur5 Apr 8, 2024
efb6b92
remove some other unnecessary error handlers
agilgur5 Apr 8, 2024
2f0f612
fix wfc.Config.Namespace issue and add clarifying comment
agilgur5 Apr 10, 2024
05ee735
add code comment about env var
agilgur5 Apr 19, 2024
8f97dcb
Merge branch 'main' into fix-configmap-informer
agilgur5 Apr 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 119 additions & 42 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/upper/db/v4"

"github.com/argoproj/argo-workflows/v3"

"github.com/argoproj/pkg/errors"
syncpkg "github.com/argoproj/pkg/sync"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -39,7 +41,6 @@ import (
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3"
"github.com/argoproj/argo-workflows/v3/config"
argoErr "github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/persist/sqldb"
Expand Down Expand Up @@ -124,6 +125,7 @@ type WorkflowController struct {
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
podInformer cache.SharedIndexInformer
configMapInformer cache.SharedIndexInformer
configMapInformerMgmt cache.SharedIndexInformer
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
wfQueue workqueue.RateLimitingInterface
podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy
throttler sync.Throttler
Expand Down Expand Up @@ -319,11 +321,18 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
}
wfc.updateEstimatorFactory()

wfc.configMapInformer, err = wfc.newConfigMapInformer()
wfc.configMapInformer, err = wfc.newConfigMapInformer(wfc.GetNamespace())
if err != nil {
log.Fatal(err)
}

if wfc.isManagedNamespaceDifferent() {
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
wfc.configMapInformerMgmt, err = wfc.newConfigMapInformer(wfc.GetManagedNamespace())
if err != nil {
log.Fatal(err)
}
}

// Create Synchronization Manager
wfc.createSynchronizationManager(ctx)
// init managers: throttler and SynchronizationManager
Expand All @@ -339,6 +348,11 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.wftmplInformer.Informer().Run(ctx.Done())
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
go wfc.podInformer.Run(ctx.Done())
go wfc.configMapInformer.Run(ctx.Done())

if wfc.isManagedNamespaceDifferent() {
go wfc.configMapInformerMgmt.Run(ctx.Done())
}

go wfc.wfTaskSetInformer.Informer().Run(ctx.Done())
go wfc.artGCTaskInformer.Informer().Run(ctx.Done())
go wfc.taskResultInformer.Run(ctx.Done())
Expand All @@ -351,6 +365,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
wfc.wftmplInformer.Informer().HasSynced,
wfc.podInformer.HasSynced,
wfc.configMapInformer.HasSynced,
wfc.isConfigMapInformerMgmtSynced,
wfc.wfTaskSetInformer.Informer().HasSynced,
wfc.artGCTaskInformer.Informer().HasSynced,
wfc.taskResultInformer.HasSynced,
Expand Down Expand Up @@ -1270,43 +1285,62 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared
return informer, nil
}

func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) {
indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{
func (wfc *WorkflowController) newConfigMapInformer(ns string) (cache.SharedIndexInformer, error) {
indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, ns, 20*time.Minute, cache.Indexers{
indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc,
juranir marked this conversation as resolved.
Show resolved Hide resolved
}, func(opts *metav1.ListOptions) {
opts.LabelSelector = common.LabelKeyConfigMapType
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
})

log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins")
if wfc.executorPlugins != nil {
_, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
cm, err := meta.Accessor(obj)
if err != nil {
return false

ctx := context.Background()
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

_, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
cm, err := meta.Accessor(obj)
if err != nil {
log.WithError(err).
Error("failed to get configmap")
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

return false
}
//return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
if wfc.isPluginCM(cm) {
log.WithField("executorPlugins", wfc.executorPlugins != nil).
Info("Plugins")
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
}

return wfc.isPluginCM(cm) || wfc.isControllerCM(cm) || wfc.isFromManagedNamespace(cm)
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm := obj.(*apiv1.ConfigMap)

agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
if !wfc.isPluginCM(cm) {
return
}
return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
p, err := plugin.FromConfigMap(cm)
if err != nil {
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
WithError(err).
Error("failed to convert configmap to plugin")
return
}
if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok {
wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{}
}
wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p

p, err := plugin.FromConfigMap(cm)
if err != nil {
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
Info("Executor plugin added")
},
UpdateFunc: func(_, obj interface{}) {
cm := obj.(*apiv1.ConfigMap)
WithError(err).
Error("failed to convert configmap to plugin")
return
}
if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok {
wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{}
}
wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
Info("Executor plugin added")
},
UpdateFunc: func(_, obj interface{}) {
cm := obj.(*apiv1.ConfigMap)

agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
if wfc.isPluginCM(cm) {
p, err := plugin.FromConfigMap(cm)
if err != nil {
log.WithField("namespace", cm.GetNamespace()).
Expand All @@ -1320,21 +1354,36 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer
log.WithField("namespace", cm.GetNamespace()).
WithField("name", cm.GetName()).
Info("Executor plugin updated")
},
DeleteFunc: func(obj interface{}) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
namespace, name, _ := cache.SplitMetaNamespaceKey(key)
delete(wfc.executorPlugins[namespace], name)
log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed")
},

} else if wfc.isControllerCM(cm) {
log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName())
wfc.UpdateConfig(ctx)
}

if wfc.isFromManagedNamespace(cm) {
wfc.notifySemaphoreConfigUpdate(cm)
}
},
})
if err != nil {
return nil, err
}
DeleteFunc: func(obj interface{}) {
cm := obj.(*apiv1.ConfigMap)

agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
if !wfc.isPluginCM(cm) {
return
}

key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
namespace, name, _ := cache.SplitMetaNamespaceKey(key)
delete(wfc.executorPlugins[namespace], name)
log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed")
},
},
})

if err != nil {
return nil, err
}
return indexInformer, nil

}

// call this func whenever the configuration changes, or when the workflow informer changes
Expand Down Expand Up @@ -1506,3 +1555,31 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr
}
return informer, nil
}

func (wfc *WorkflowController) isManagedNamespaceDifferent() bool {
return wfc.GetNamespace() != wfc.GetManagedNamespace()
}
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

func (wfc *WorkflowController) GetNamespace() string {
return wfc.Config.Namespace
}
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

func (wfc *WorkflowController) isPluginCM(cm metav1.Object) bool {
return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin
}
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

func (wfc *WorkflowController) isControllerCM(cm metav1.Object) bool {
return cm.GetName() == wfc.configController.GetName()
}
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved

func (wfc *WorkflowController) isFromManagedNamespace(cm metav1.Object) bool {
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
return cm.GetNamespace() == wfc.GetManagedNamespace()
}

func (wfc *WorkflowController) isConfigMapInformerMgmtSynced() bool {
if wfc.isManagedNamespaceDifferent() {
return wfc.configMapInformerMgmt.HasSynced()
}

return true
}
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
_ = wfc.addWorkflowInformerHandlers(ctx)
wfc.podInformer, _ = wfc.newPodInformer(ctx)
wfc.configMapInformer, _ = wfc.newConfigMapInformer()
wfc.configMapInformerMgmt, _ = wfc.newConfigMapInformer(wfc.GetManagedNamespace())
wfc.configMapInformer, _ = wfc.newConfigMapInformer(wfc.GetNamespace())
wfc.createSynchronizationManager(ctx)
_ = wfc.initManagers(ctx)

Expand Down
Loading