From 772d9db78dea3a0ad9280c0d1908e3e1608ba0d8 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 27 Oct 2020 11:14:07 -0700 Subject: [PATCH 1/5] feat(controller): Deprecate pod-workers. Fixes #4398 Signed-off-by: Alex Collins --- cmd/workflow-controller/main.go | 8 +- workflow/controller/controller.go | 116 +++++--------------- workflow/controller/controller_test.go | 1 - workflow/controller/pod/log.go | 16 --- workflow/controller/pod/significant.go | 76 ------------- workflow/controller/pod/significant_test.go | 85 -------------- 6 files changed, 33 insertions(+), 269 deletions(-) delete mode 100644 workflow/controller/pod/log.go delete mode 100644 workflow/controller/pod/significant.go delete mode 100644 workflow/controller/pod/significant_test.go diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 73bd190226f5..f22b94ae6c3f 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -84,7 +84,11 @@ func NewRootCommand() *cobra.Command { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go wfController.Run(ctx, workflowWorkers, podWorkers) + if podWorkers >= 0 { + log.Warn("ignoring --pod-workers: it is no longer supported") + } + + go wfController.Run(ctx, workflowWorkers) // Wait forever select {} @@ -100,7 +104,7 @@ func NewRootCommand() *cobra.Command { command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error") command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level") command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers") - command.Flags().IntVar(&podWorkers, "pod-workers", 32, "Number of pod workers") + command.Flags().IntVar(&podWorkers, "pod-workers", -1, "Number of pod workers") command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.") command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second") command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 59f9180efb03..87c3728d074d 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -43,7 +43,6 @@ import ( "github.com/argoproj/argo/workflow/controller/estimation" "github.com/argoproj/argo/workflow/controller/indexes" "github.com/argoproj/argo/workflow/controller/informer" - "github.com/argoproj/argo/workflow/controller/pod" "github.com/argoproj/argo/workflow/cron" "github.com/argoproj/argo/workflow/events" "github.com/argoproj/argo/workflow/hydrator" @@ -84,7 +83,6 @@ type WorkflowController struct { cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer podInformer cache.SharedIndexInformer wfQueue workqueue.RateLimitingInterface - podQueue workqueue.RateLimitingInterface completedPods chan string gcPods chan string // pods to be deleted depend on GC strategy throttler sync.Throttler @@ -141,7 +139,6 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workflow_queue") wfc.throttler = sync.NewThrottler(0, wfc.wfQueue) wfc.throttler.SetParallelism(wfc.getParallelism()) - wfc.podQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_queue") return &wfc, nil } @@ -169,12 +166,11 @@ var indexers = cache.Indexers{ } // Run starts an Workflow resource controller -func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers int) { +func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers int) { defer wfc.wfQueue.ShutDown() - defer wfc.podQueue.ShutDown() log.WithField("version", argo.GetVersion().Version).Info("Starting Workflow Controller") - log.Infof("Workers: workflow: %d, pod: %d", wfWorkers, podWorkers) + log.Infof("Workers: workflow: %d,", wfWorkers) wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers) wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace) @@ -215,9 +211,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in for i := 0; i < wfWorkers; i++ { go wait.Until(wfc.runWorker, time.Second, ctx.Done()) } - for i := 0; i < podWorkers; i++ { - go wait.Until(wfc.podWorker, time.Second, ctx.Done()) - } <-ctx.Done() } @@ -562,59 +555,30 @@ func (wfc *WorkflowController) processNextItem() bool { return true } -func (wfc *WorkflowController) podWorker() { - for wfc.processNextPodItem() { - } -} - -// processNextPodItem is the worker logic for handling pod updates. -// For pods updates, this simply means to "wake up" the workflow by -// adding the corresponding workflow key into the workflow workqueue. -func (wfc *WorkflowController) processNextPodItem() bool { - key, quit := wfc.podQueue.Get() - if quit { - return false - } - defer wfc.podQueue.Done(key) - - obj, exists, err := wfc.podInformer.GetIndexer().GetByKey(key.(string)) - if err != nil { - log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get pod from informer index") - return true - } - if !exists { - // we can get here if pod was queued into the pod workqueue, - // but it was either deleted or labeled completed by the time - // we dequeued it. - return true - } - - err = wfc.enqueueWfFromPodLabel(obj) - if err != nil { - log.WithError(err).Warnf("Failed to enqueue the workflow for %s", key) - } - return true -} - // enqueueWfFromPodLabel will extract the workflow name from pod label and // enqueue workflow for processing -func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error { - pod, ok := obj.(*apiv1.Pod) - if !ok { - return fmt.Errorf("Key in index is not a pod") - } - if pod.Labels == nil { - return fmt.Errorf("Pod did not have labels") - } - workflowName, ok := pod.Labels[common.LabelKeyWorkflow] - if !ok { - // Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly) - return fmt.Errorf("Watch returned pod unrelated to any workflow") +func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) { + err := func() error { + pod, ok := obj.(*apiv1.Pod) + if !ok { + return fmt.Errorf("pey in index is not a pod") + } + if pod.Labels == nil { + return fmt.Errorf("pod did not have labels") + } + workflowName, ok := pod.Labels[common.LabelKeyWorkflow] + if !ok { + // Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly) + return fmt.Errorf("watch returned pod unrelated to any workflow") + } + // add this change after 1s - this reduces the number of workflow reconciliations - + //with each reconciliation doing more work + wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync) + return nil + }() + if err != nil { + log.Error(err) } - // add this change after 1s - this reduces the number of workflow reconciliations - - //with each reconciliation doing more work - wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync) - return nil } func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) { @@ -818,37 +782,11 @@ func (wfc *WorkflowController) newPodInformer() cache.SharedIndexInformer { }) informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - return - } - wfc.podQueue.Add(key) - }, - UpdateFunc: func(old, new interface{}) { - key, err := cache.MetaNamespaceKeyFunc(new) - if err != nil { - return - } - oldPod, newPod := old.(*apiv1.Pod), new.(*apiv1.Pod) - if oldPod.ResourceVersion == newPod.ResourceVersion { - return - } - if !pod.SignificantPodChange(oldPod, newPod) { - log.WithField("key", key).Info("insignificant pod change") - pod.LogChanges(oldPod, newPod) - return - } - wfc.podQueue.Add(key) - }, - DeleteFunc: func(obj interface{}) { - // IndexerInformer uses a delta queue, therefore for deletes we have to use this - // key function. - - // Enqueue the workflow for deleted pod - _ = wfc.enqueueWfFromPodLabel(obj) - + AddFunc: wfc.enqueueWfFromPodLabel, + UpdateFunc: func(_, obj interface{}) { + wfc.enqueueWfFromPodLabel(obj) }, + DeleteFunc: wfc.enqueueWfFromPodLabel, }, ) return informer diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index f29a9ab125be..5bc4ec3cea3a 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -149,7 +149,6 @@ func newController(objects ...runtime.Object) (context.CancelFunc, *WorkflowCont wftmplInformer: wftmplInformer, cwftmplInformer: cwftmplInformer, wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), workflowKeyLock: sync.NewKeyLock(), wfArchive: sqldb.NullWorkflowArchive, hydrator: hydratorfake.Noop, diff --git a/workflow/controller/pod/log.go b/workflow/controller/pod/log.go deleted file mode 100644 index bde0b40410fc..000000000000 --- a/workflow/controller/pod/log.go +++ /dev/null @@ -1,16 +0,0 @@ -package pod - -import ( - "encoding/json" - - log "github.com/sirupsen/logrus" - apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/strategicpatch" -) - -func LogChanges(oldPod *apiv1.Pod, newPod *apiv1.Pod) { - a, _ := json.Marshal(oldPod) - b, _ := json.Marshal(newPod) - patch, _ := strategicpatch.CreateTwoWayMergePatch(a, b, &apiv1.Pod{}) - log.Debugln(string(patch)) -} diff --git a/workflow/controller/pod/significant.go b/workflow/controller/pod/significant.go deleted file mode 100644 index e2afbc51eb00..000000000000 --- a/workflow/controller/pod/significant.go +++ /dev/null @@ -1,76 +0,0 @@ -package pod - -import ( - "os" - - apiv1 "k8s.io/api/core/v1" -) - -func SignificantPodChange(from *apiv1.Pod, to *apiv1.Pod) bool { - return os.Getenv("ALL_POD_CHANGES_SIGNIFICANT") == "true" || - from.Spec.NodeName != to.Spec.NodeName || - from.Status.Phase != to.Status.Phase || - from.Status.Message != to.Status.Message || - from.Status.PodIP != to.Status.PodIP || - from.GetDeletionTimestamp() != to.GetDeletionTimestamp() || - significantAnnotationChange(from.Annotations, to.Annotations) || - significantContainerStatusesChange(from.Status.ContainerStatuses, to.Status.ContainerStatuses) || - significantContainerStatusesChange(from.Status.InitContainerStatuses, to.Status.InitContainerStatuses) || - significantConditionsChange(from.Status.Conditions, to.Status.Conditions) -} - -func significantAnnotationChange(from map[string]string, to map[string]string) bool { - if len(from) != len(to) { - return true - } - for k, v := range from { - if to[k] != v { - return true - } - } - // as both annotations must be the same length, the above loop will always catch all changes, - // we don't need to range with `to` - return false -} - -func significantContainerStatusesChange(from []apiv1.ContainerStatus, to []apiv1.ContainerStatus) bool { - if len(from) != len(to) { - return true - } - statuses := map[string]apiv1.ContainerStatus{} - for _, s := range from { - statuses[s.Name] = s - } - for _, s := range to { - if significantContainerStatusChange(statuses[s.Name], s) { - return true - } - } - return false -} - -func significantContainerStatusChange(from apiv1.ContainerStatus, to apiv1.ContainerStatus) bool { - return from.Ready != to.Ready || significantContainerStateChange(from.State, to.State) -} - -func significantContainerStateChange(from apiv1.ContainerState, to apiv1.ContainerState) bool { - // waiting has two significant fields and either could potentially change - return to.Waiting != nil && (from.Waiting == nil || from.Waiting.Message != to.Waiting.Message || from.Waiting.Reason != to.Waiting.Reason) || - // running only has one field which is immutable - so any change is significant - (to.Running != nil && from.Running == nil) || - // I'm assuming this field is immutable - so any change is significant - (to.Terminated != nil && from.Terminated == nil) -} - -func significantConditionsChange(from []apiv1.PodCondition, to []apiv1.PodCondition) bool { - if len(from) != len(to) { - return true - } - for i, a := range from { - b := to[i] - if a.Message != b.Message || a.Reason != b.Reason { - return true - } - } - return false -} diff --git a/workflow/controller/pod/significant_test.go b/workflow/controller/pod/significant_test.go deleted file mode 100644 index 4050911f16e4..000000000000 --- a/workflow/controller/pod/significant_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package pod - -import ( - "os" - "testing" - - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func Test_SgnificantPodChange(t *testing.T) { - t.Run("NoChange", func(t *testing.T) { - assert.False(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{})) - }) - t.Run("ALL_POD_CHANGES_SIGNIFICANT", func(t *testing.T) { - _ = os.Setenv("ALL_POD_CHANGES_SIGNIFICANT", "true") - defer func() { _ = os.Unsetenv("ALL_POD_CHANGES_SIGNIFICANT") }() - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{})) - }) - t.Run("DeletionTimestamp", func(t *testing.T) { - now := metav1.Now() - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{DeletionTimestamp: &now}}), "deletion timestamp change") - }) - t.Run("Annotations", func(t *testing.T) { - assert.True(t, SignificantPodChange(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}}, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "bar"}}}), "new annotation") - assert.True(t, SignificantPodChange(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "bar"}}}, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "baz"}}}), "changed annotation") - assert.True(t, SignificantPodChange(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "bar"}}}, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}}), "deleted annotation") - }) - t.Run("Spec", func(t *testing.T) { - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{Spec: corev1.PodSpec{NodeName: "from"}}), "Node name change") - }) - t.Run("Status", func(t *testing.T) { - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{Status: corev1.PodStatus{Phase: corev1.PodRunning}}), "Phase change") - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{Status: corev1.PodStatus{PodIP: "my-ip"}}), "Pod IP change") - }) - t.Run("ContainerStatuses", func(t *testing.T) { - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{}}}}), "Number of container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{Ready: true}}}}, - ), "Ready of container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{}}}}}}, - ), "Waiting of container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{}}}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Reason: "my-reason"}}}}}}, - ), "Waiting reason of container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{}}}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Message: "my-message"}}}}}}, - ), "Waiting message of container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{}}}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Waiting: &corev1.ContainerStateWaiting{Message: "my-message"}}}}}}, - ), "Waiting message of container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}}}}}, - ), "Running container status changes") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{}}}}, - &corev1.Pod{Status: corev1.PodStatus{ContainerStatuses: []corev1.ContainerStatus{{State: corev1.ContainerState{Terminated: &corev1.ContainerStateTerminated{}}}}}}, - ), "Terminate container status changes") - }) - t.Run("InitContainerStatuses", func(t *testing.T) { - assert.True(t, SignificantPodChange(&corev1.Pod{}, &corev1.Pod{Status: corev1.PodStatus{InitContainerStatuses: []corev1.ContainerStatus{{}}}}), "Number of container status changes") - }) - t.Run("Conditions", func(t *testing.T) { - assert.True(t, SignificantPodChange( - &corev1.Pod{}, - &corev1.Pod{Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{}}}}), - "condition added") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{}}}}, - &corev1.Pod{Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Reason: "es"}}}}, - ), "condition changed") - assert.True(t, SignificantPodChange( - &corev1.Pod{Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{}}}}, - &corev1.Pod{}, - ), "condition removed") - }) -} From 22f158a7623b265fe5141d28fc5451883e50e90c Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 27 Oct 2020 12:28:30 -0700 Subject: [PATCH 2/5] no-sig: M workflow/controller/controller.go Signed-off-by: Alex Collins --- workflow/controller/controller.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 87c3728d074d..6ed47f68a7b1 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -749,10 +749,7 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { c := wfc.kubeclientset.CoreV1().RESTClient() resource := "pods" namespace := wfc.GetManagedNamespace() - // completed=false - incompleteReq, _ := labels.NewRequirement(common.LabelKeyCompleted, selection.Equals, []string{"false"}) labelSelector := labels.NewSelector(). - Add(*incompleteReq). Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) listFunc := func(options metav1.ListOptions) (runtime.Object, error) { From 0c8910e6711d2c8ba1b868c802b2870e46ec8c92 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 27 Oct 2020 13:18:21 -0700 Subject: [PATCH 3/5] no-sig: M workflow/controller/controller.go Signed-off-by: Alex Collins --- workflow/controller/controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 6ed47f68a7b1..4c41aeb5b459 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -749,7 +749,9 @@ func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { c := wfc.kubeclientset.CoreV1().RESTClient() resource := "pods" namespace := wfc.GetManagedNamespace() + requirement, _ := labels.NewRequirement(common.LabelKeyWorkflow, selection.Exists, []string{}) labelSelector := labels.NewSelector(). + Add(*requirement). Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) listFunc := func(options metav1.ListOptions) (runtime.Object, error) { From dd58b467127d5d5f901738e89179360feaf23769 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Tue, 27 Oct 2020 14:15:53 -0700 Subject: [PATCH 4/5] no-sig: M workflow/controller/controller.go Signed-off-by: Alex Collins --- workflow/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4c41aeb5b459..1ab3bc06fc8e 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -561,7 +561,7 @@ func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) { err := func() error { pod, ok := obj.(*apiv1.Pod) if !ok { - return fmt.Errorf("pey in index is not a pod") + return fmt.Errorf("key in index is not a pod") } if pod.Labels == nil { return fmt.Errorf("pod did not have labels") From 12aae8228372a411bd63f556b6e2eca11664b286 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Thu, 19 Nov 2020 09:37:20 -0800 Subject: [PATCH 5/5] no-sig: M workflow/controller/controller.go Signed-off-by: Alex Collins --- workflow/controller/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 169113301230..05ae3a440200 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -803,9 +803,9 @@ func (wfc *WorkflowController) archiveWorkflowAux(obj interface{}) error { func (wfc *WorkflowController) newWorkflowPodWatch() *cache.ListWatch { c := wfc.kubeclientset.CoreV1().Pods(wfc.GetManagedNamespace()) // completed=false - requirement, _ := labels.NewRequirement(common.LabelKeyWorkflow, selection.Exists, []string{}) + incompleteReq, _ := labels.NewRequirement(common.LabelKeyCompleted, selection.Equals, []string{"false"}) labelSelector := labels.NewSelector(). - Add(*requirement). + Add(*incompleteReq). Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) listFunc := func(options metav1.ListOptions) (runtime.Object, error) {