diff --git a/workflow/controller/cache_gc.go b/workflow/controller/cache_gc.go index 5d641c7407ec..d56032588a72 100644 --- a/workflow/controller/cache_gc.go +++ b/workflow/controller/cache_gc.go @@ -25,7 +25,7 @@ func init() { // syncAllCacheForGC syncs all cache for GC func (wfc *WorkflowController) syncAllCacheForGC(ctx context.Context) { - configMaps, err := wfc.configMapInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) + configMaps, err := wfc.cmInformer.GetIndexer().ByIndex(indexes.ConfigMapLabelsIndex, common.LabelValueTypeConfigMapCache) if err != nil { log.WithError(err).Error("Failed to get configmaps from informer") return diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index e738f5f64504..76a69536bf62 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" @@ -35,7 +36,6 @@ import ( typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - apiwatch "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" @@ -123,7 +123,9 @@ type WorkflowController struct { wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer - configMapInformer cache.SharedIndexInformer + cmInformer cache.SharedIndexInformer // configmaps with common.LabelKeyConfigMapType: plugins, parameters, memoizations, etc + cmControllerInformer cache.SharedIndexInformer // controller's own configmap + cmSemaphoreInformer cache.SharedIndexInformer // semaphore configmaps wfQueue workqueue.RateLimitingInterface podCleanupQueue workqueue.RateLimitingInterface // pods to be deleted or labelled depend on GC strategy throttler sync.Throttler @@ -302,7 +304,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.podInformer = wfc.newPodInformer(ctx) wfc.updateEstimatorFactory() - wfc.configMapInformer = wfc.newConfigMapInformer() + wfc.cmInformer = wfc.newConfigMapInformer() + wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() // Create Synchronization Manager wfc.createSynchronizationManager(ctx) @@ -311,14 +315,12 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo log.Fatal(err) } - if os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" { - go wfc.runConfigMapWatcher(ctx.Done()) - } - go wfc.wfInformer.Run(ctx.Done()) go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) - go wfc.configMapInformer.Run(ctx.Done()) + go wfc.cmInformer.Run(ctx.Done()) + go wfc.cmControllerInformer.Run(ctx.Done()) + go wfc.cmSemaphoreInformer.Run(ctx.Done()) go wfc.wfTaskSetInformer.Informer().Run(ctx.Done()) go wfc.artGCTaskInformer.Informer().Run(ctx.Done()) go wfc.taskResultInformer.Run(ctx.Done()) @@ -330,7 +332,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo wfc.wfInformer.HasSynced, wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced, - wfc.configMapInformer.HasSynced, + wfc.cmInformer.HasSynced, + wfc.cmControllerInformer.HasSynced, + wfc.cmSemaphoreInformer.HasSynced, wfc.wfTaskSetInformer.Informer().HasSynced, wfc.artGCTaskInformer.Informer().HasSynced, wfc.taskResultInformer.HasSynced, @@ -421,43 +425,12 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { return nil } -func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) { - defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) - - ctx := context.Background() - retryWatcher, err := apiwatch.NewRetryWatcher("1", &cache.ListWatch{ - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return wfc.kubeclientset.CoreV1().ConfigMaps(wfc.managedNamespace).Watch(ctx, metav1.ListOptions{}) - }, - }) - if err != nil { - panic(err) - } - defer retryWatcher.Stop() - - for { - select { - case event := <-retryWatcher.ResultChan(): - cm, ok := event.Object.(*apiv1.ConfigMap) - if !ok { - log.Errorf("invalid config map object received in config watcher. Ignored processing") - continue - } - log.Debugf("received config map %s/%s update", cm.Namespace, cm.Name) - if cm.GetName() == wfc.configController.GetName() && wfc.namespace == cm.GetNamespace() { - log.Infof("Received Workflow Controller config map %s/%s update", cm.Namespace, cm.Name) - wfc.UpdateConfig(ctx) - } - wfc.notifySemaphoreConfigUpdate(cm) - case <-stopCh: - return - } - } -} - // notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows -func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) { - wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name)) +func (wfc *WorkflowController) notifySemaphoreConfigUpdate(ns string, name string) { + key := fmt.Sprintf("%s/%s", ns, name) + log.Debugf("received semaphore config map %s update", key) + + wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, key) if err != nil { log.Errorf("failed get the workflow from informer. %v", err) } @@ -1042,9 +1015,6 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) // This function is called when an object is to be removed // from the informer DeleteFunc: func(obj interface{}) { - // IndexerInformer uses a delta queue, therefore for deletes we have to use this - // key function. - // Remove finalizers from Pods if they exist before deletion pods := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace()) podList, err := pods.List(ctx, metav1.ListOptions{ @@ -1059,6 +1029,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context) } } + // IndexerInformer uses a delta queue, therefore for deletes we have to use this key function. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { wfc.releaseAllWorkflowLocks(obj) @@ -1252,66 +1223,149 @@ func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc, }, func(opts *metav1.ListOptions) { - opts.LabelSelector = common.LabelKeyConfigMapType + opts.LabelSelector = common.LabelKeyConfigMapType // only configmaps with this label }) + log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins") - if wfc.executorPlugins != nil { - //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) - indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - cm, err := meta.Accessor(obj) - if err != nil { - return false - } - return cm.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - p, err := plugin.FromConfigMap(cm) - if err != nil { - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - WithError(err). - Error("failed to convert configmap to plugin") - return - } - if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { - wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} - } - wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - Info("Executor plugin added") - }, - UpdateFunc: func(_, obj interface{}) { - cm := obj.(*apiv1.ConfigMap) - p, err := plugin.FromConfigMap(cm) - if err != nil { - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - WithError(err). - Error("failed to convert configmap to plugin") - return - } + if wfc.executorPlugins == nil { + return indexInformer + } - wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p - log.WithField("namespace", cm.GetNamespace()). - WithField("name", cm.GetName()). - Info("Executor plugin updated") - }, - DeleteFunc: func(obj interface{}) { - key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - namespace, name, _ := cache.SplitMetaNamespaceKey(key) - delete(wfc.executorPlugins[namespace], name) - log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") - }, + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cmMeta, err := meta.Accessor(obj) + if err != nil { + log.WithError(err). + Error("failed to get configmap metadata") + return false + } + + return isPluginCM(cmMeta) + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "added") }, - }) + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.applyPluginCM(cm, "updated") + }, + DeleteFunc: func(obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.deletePluginCM(cm) + }, + }, + }) + return indexInformer +} + +func isPluginCM(cmMeta metav1.Object) bool { + return cmMeta.GetLabels()[common.LabelKeyConfigMapType] == common.LabelValueTypeConfigMapExecutorPlugin +} + +func (wfc *WorkflowController) applyPluginCM(cm *apiv1.ConfigMap, verb string) { + p, err := plugin.FromConfigMap(cm) + if err != nil { + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + WithError(err). + Error("failed to convert configmap to plugin") + return + } + if _, ok := wfc.executorPlugins[cm.GetNamespace()]; !ok { + wfc.executorPlugins[cm.GetNamespace()] = map[string]*spec.Plugin{} + } + wfc.executorPlugins[cm.GetNamespace()][cm.GetName()] = p + log.WithField("namespace", cm.GetNamespace()). + WithField("name", cm.GetName()). + Infof("Executor plugin %s", verb) +} + +func (wfc *WorkflowController) deletePluginCM(cm *apiv1.ConfigMap) { + key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(cm) + namespace, name, _ := cache.SplitMetaNamespaceKey(key) + delete(wfc.executorPlugins[namespace], name) + log.WithField("namespace", namespace).WithField("name", name).Info("Executor plugin removed") +} + +// Whether to watch the Controller's ConfigMap and semaphore ConfigMaps for run-time changes +var watchControllerSemaphoreConfigMaps = os.Getenv("WATCH_CONTROLLER_SEMAPHORE_CONFIGMAPS") != "false" + +func (wfc *WorkflowController) newConfigMapControllerInformer(ctx context.Context) cache.SharedIndexInformer { + indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetNamespace(), 20*time.Minute, nil, func(opts *metav1.ListOptions) { + opts.FieldSelector = fields.OneTermEqualSelector(metav1.ObjectNameField, wfc.configController.GetName()).String() // only the controller configmap + }) + + if !watchControllerSemaphoreConfigMaps { + return indexInformer + } + + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + log.Infof("Received Workflow Controller config map %s/%s update", cm.GetNamespace(), cm.GetName()) + wfc.UpdateConfig(ctx) + }, + }) + return indexInformer +} + +func (wfc *WorkflowController) newConfigMapSemaphoreInformer() cache.SharedIndexInformer { + indexInformer := v1.NewConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }) + //nolint:errcheck // the error only happens if the informer has already started, which hasn't happened yet (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L446) + indexInformer.SetTransform(func(obj interface{}) (interface{}, error) { + cm, ok := obj.(*apiv1.ConfigMap) + if !ok { + return obj, nil + } + + // only leave name and namespace, remove the rest as we don't use it + cm = cm.DeepCopy() + cm.ObjectMeta = metav1.ObjectMeta{ + Name: cm.Name, + Namespace: cm.Namespace, + } + cm.Data = map[string]string{} + cm.BinaryData = map[string][]byte{} + return cm, nil + }) + + if !watchControllerSemaphoreConfigMaps { + return indexInformer } + + //nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580) + indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + cmMeta, err := meta.Accessor(obj) + if err != nil { + log.WithError(err). + Error("failed to get configmap metadata") + return false + } + + return isSemaphoreCM(cmMeta.GetNamespace(), cmMeta.GetName()) + }, + Handler: cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, obj interface{}) { + cm := obj.(*apiv1.ConfigMap) + wfc.notifySemaphoreConfigUpdate(cm.GetNamespace(), cm.GetName()) + }, + }, + }) return indexInformer } +func isSemaphoreCM(ns string, name string) bool { + key := fmt.Sprintf("%s/%s", ns, name) + return indexes.HasSemaphoreKey(key) +} + // call this func whenever the configuration changes, or when the workflow informer changes func (wfc *WorkflowController) updateEstimatorFactory() { wfc.estimatorFactory = estimation.NewEstimatorFactory(wfc.wfInformer, wfc.hydrator, wfc.wfArchive) @@ -1330,11 +1384,15 @@ func (wfc *WorkflowController) setWorkflowDefaults(wf *wfv1.Workflow) error { return nil } +func (wfc *WorkflowController) GetNamespace() string { + return wfc.namespace +} + func (wfc *WorkflowController) GetManagedNamespace() string { if wfc.managedNamespace != "" { return wfc.managedNamespace } - return wfc.Config.Namespace + return wfc.Config.Namespace // if not set, then empty string = cluster-scope. if set, then limit the watch } func (wfc *WorkflowController) getMaxStackDepth() int { diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 0c4af5f4e6dd..4ac6b649d5c3 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -321,8 +321,10 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer() wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates() _ = wfc.addWorkflowInformerHandlers(ctx) - wfc.podInformer = wfc.newPodInformer(ctx) - wfc.configMapInformer = wfc.newConfigMapInformer() + wfc.podInformer, _ = wfc.newPodInformer(ctx) + wfc.cmInformer = wfc.newConfigMapInformer() + wfc.cmControllerInformer = wfc.newConfigMapControllerInformer(ctx) + wfc.cmSemaphoreInformer = wfc.newConfigMapSemaphoreInformer() wfc.createSynchronizationManager(ctx) _ = wfc.initManagers(ctx) @@ -927,10 +929,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { cancel, controller := newController(wf, wf1, wf2) defer cancel() - cm := apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{ - Name: "my-config", - Namespace: "default", - }} + cmNamespace, cmName := "default", "my-config" assert.Equal(3, controller.wfQueue.Len()) // Remove all Wf from Worker queue @@ -940,7 +939,7 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { } assert.Equal(0, controller.wfQueue.Len()) - controller.notifySemaphoreConfigUpdate(&cm) + controller.notifySemaphoreConfigUpdate(cmNamespace, cmName) time.Sleep(2 * time.Second) assert.Equal(2, controller.wfQueue.Len()) } diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index fd60b5262eed..e393b6317574 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -439,7 +439,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex } } - processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) + processedTmpl, err := common.ProcessArgs(tmpl, &task.Arguments, woc.globalParams, map[string]string{}, true, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { woc.markNodeError(node.Name, err) } diff --git a/workflow/controller/indexes/workflow_index.go b/workflow/controller/indexes/workflow_index.go index 71ed973b367f..c6b2a315fb1e 100644 --- a/workflow/controller/indexes/workflow_index.go +++ b/workflow/controller/indexes/workflow_index.go @@ -14,6 +14,7 @@ import ( var ( indexWorkflowSemaphoreKeys = os.Getenv("INDEX_WORKFLOW_SEMAPHORE_KEYS") != "false" + semaphoreKeyMap = make(map[string]struct{}) // set of all semaphore keys ) func init() { @@ -70,6 +71,16 @@ func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc { if err != nil { return nil, nil } - return wf.GetSemaphoreKeys(), nil + keys := wf.GetSemaphoreKeys() + // store all keys while indexing as a side effect + for _, key := range keys { + semaphoreKeyMap[key] = struct{}{} + } + return keys, nil } } + +func HasSemaphoreKey(key string) bool { + _, ok := semaphoreKeyMap[key] + return ok +} diff --git a/workflow/controller/indexes/workflow_index_test.go b/workflow/controller/indexes/workflow_index_test.go index d1ea7128b022..4c3138ee5203 100644 --- a/workflow/controller/indexes/workflow_index_test.go +++ b/workflow/controller/indexes/workflow_index_test.go @@ -87,6 +87,7 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Len(t, result, 1) + assert.True(t, HasSemaphoreKey(result[0])) }) t.Run("Incomplete", func(t *testing.T) { un, _ := util.ToUnstructured(&wfv1.Workflow{ @@ -106,6 +107,7 @@ func TestWorkflowSemaphoreKeysIndexFunc(t *testing.T) { result, err := WorkflowSemaphoreKeysIndexFunc()(un) assert.NoError(t, err) assert.Len(t, result, 1) + assert.True(t, HasSemaphoreKey(result[0])) }) t.Run("Complete", func(t *testing.T) { un, _ := util.ToUnstructured(&wfv1.Workflow{ diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 6557236cbd8e..89b79a01171e 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -639,7 +639,7 @@ func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Argument } for _, param := range executionParameters.Parameters { if param.ValueFrom != nil && param.ValueFrom.ConfigMapKeyRef != nil { - cmValue, err := common.GetConfigMapValue(woc.controller.configMapInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) + cmValue, err := common.GetConfigMapValue(woc.controller.cmInformer.GetIndexer(), woc.wf.ObjectMeta.Namespace, param.ValueFrom.ConfigMapKeyRef.Name, param.ValueFrom.ConfigMapKeyRef.Key) if err != nil { if param.ValueFrom.Default != nil { woc.globalParams["workflow.parameters."+param.Name] = param.ValueFrom.Default.String() @@ -1920,7 +1920,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, } // Inputs has been processed with arguments already, so pass empty arguments. - processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) + processedTmpl, err := common.ProcessArgs(resolvedTmpl, &args, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 7b46e5d5e3c4..085c546ba401 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -365,7 +365,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin if tmpl.IsPodType() { localParams[common.LocalVarPodName] = pod.Name } - tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer()) + tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.cmInformer.GetIndexer()) if err != nil { return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables") }