Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Deprecate pod-workers. Fixes #4398 #4403

Closed
wants to merge 10 commits into from
8 changes: 6 additions & 2 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func NewRootCommand() *cobra.Command {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go wfController.Run(ctx, workflowWorkers, podWorkers)
if podWorkers >= 0 {
log.Warn("ignoring --pod-workers: it is no longer supported")
}

go wfController.Run(ctx, workflowWorkers)

// Wait forever
select {}
Expand All @@ -100,7 +104,7 @@ func NewRootCommand() *cobra.Command {
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers")
command.Flags().IntVar(&podWorkers, "pod-workers", 32, "Number of pod workers")
command.Flags().IntVar(&podWorkers, "pod-workers", -1, "Number of pod workers")
command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.")
command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second")
command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode")
Expand Down
116 changes: 27 additions & 89 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/argoproj/argo/workflow/controller/estimation"
"github.com/argoproj/argo/workflow/controller/indexes"
"github.com/argoproj/argo/workflow/controller/informer"
"github.com/argoproj/argo/workflow/controller/pod"
"github.com/argoproj/argo/workflow/cron"
"github.com/argoproj/argo/workflow/events"
"github.com/argoproj/argo/workflow/hydrator"
Expand Down Expand Up @@ -87,7 +86,6 @@ type WorkflowController struct {
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
podInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
podQueue workqueue.RateLimitingInterface
completedPods chan string
gcPods chan string // pods to be deleted depend on GC strategy
throttler sync.Throttler
Expand Down Expand Up @@ -143,7 +141,6 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int
workqueue.SetProvider(wfc.metrics)
wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workflow_queue")
wfc.throttler = wfc.newThrottler()
wfc.podQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_queue")

return &wfc, nil
}
Expand Down Expand Up @@ -174,12 +171,11 @@ var indexers = cache.Indexers{
}

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers int) {
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers int) {
defer wfc.wfQueue.ShutDown()
defer wfc.podQueue.ShutDown()

log.WithField("version", argo.GetVersion().Version).Info("Starting Workflow Controller")
log.Infof("Workers: workflow: %d, pod: %d", wfWorkers, podWorkers)
log.Infof("Workers: workflow: %d,", wfWorkers)

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)
Expand Down Expand Up @@ -214,9 +210,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
for i := 0; i < wfWorkers; i++ {
go wait.Until(wfc.runWorker, time.Second, ctx.Done())
}
for i := 0; i < podWorkers; i++ {
go wait.Until(wfc.podWorker, time.Second, ctx.Done())
}
<-ctx.Done()
}

Expand Down Expand Up @@ -617,59 +610,30 @@ func (wfc *WorkflowController) processNextItem() bool {
return true
}

func (wfc *WorkflowController) podWorker() {
for wfc.processNextPodItem() {
}
}

// processNextPodItem is the worker logic for handling pod updates.
// For pods updates, this simply means to "wake up" the workflow by
// adding the corresponding workflow key into the workflow workqueue.
func (wfc *WorkflowController) processNextPodItem() bool {
key, quit := wfc.podQueue.Get()
if quit {
return false
}
defer wfc.podQueue.Done(key)

obj, exists, err := wfc.podInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get pod from informer index")
return true
}
if !exists {
// we can get here if pod was queued into the pod workqueue,
// but it was either deleted or labeled completed by the time
// we dequeued it.
return true
}

err = wfc.enqueueWfFromPodLabel(obj)
if err != nil {
log.WithError(err).Warnf("Failed to enqueue the workflow for %s", key)
}
return true
}

// enqueueWfFromPodLabel will extract the workflow name from pod label and
// enqueue workflow for processing
func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error {
pod, ok := obj.(*apiv1.Pod)
if !ok {
return fmt.Errorf("Key in index is not a pod")
}
if pod.Labels == nil {
return fmt.Errorf("Pod did not have labels")
}
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
return fmt.Errorf("Watch returned pod unrelated to any workflow")
func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) {
err := func() error {
pod, ok := obj.(*apiv1.Pod)
if !ok {
return fmt.Errorf("key in index is not a pod")
}
if pod.Labels == nil {
return fmt.Errorf("pod did not have labels")
}
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
return fmt.Errorf("watch returned pod unrelated to any workflow")
}
// add this change after 1s - this reduces the number of workflow reconciliations -
//with each reconciliation doing more work
wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync)
return nil
}()
if err != nil {
log.Error(err)
}
// add this change after 1s - this reduces the number of workflow reconciliations -
//with each reconciliation doing more work
wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync)
return nil
}

func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) {
Expand Down Expand Up @@ -863,37 +827,11 @@ func (wfc *WorkflowController) newPodInformer() cache.SharedIndexInformer {
})
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return
}
wfc.podQueue.Add(key)
},
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err != nil {
return
}
oldPod, newPod := old.(*apiv1.Pod), new.(*apiv1.Pod)
if oldPod.ResourceVersion == newPod.ResourceVersion {
return
}
if !pod.SignificantPodChange(oldPod, newPod) {
log.WithField("key", key).Info("insignificant pod change")
pod.LogChanges(oldPod, newPod)
return
}
wfc.podQueue.Add(key)
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.

// Enqueue the workflow for deleted pod
_ = wfc.enqueueWfFromPodLabel(obj)

AddFunc: wfc.enqueueWfFromPodLabel,
UpdateFunc: func(_, obj interface{}) {
wfc.enqueueWfFromPodLabel(obj)
},
DeleteFunc: wfc.enqueueWfFromPodLabel,
},
)
return informer
Expand Down
1 change: 0 additions & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfc.metrics = metrics.New(metrics.ServerConfig{}, metrics.ServerConfig{})
wfc.wfQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
wfc.throttler = wfc.newThrottler()
wfc.podQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
}

// always compare to WorkflowController.Run to see what this block of code should be doing
Expand Down
16 changes: 0 additions & 16 deletions workflow/controller/pod/log.go

This file was deleted.

76 changes: 0 additions & 76 deletions workflow/controller/pod/significant.go

This file was deleted.

85 changes: 0 additions & 85 deletions workflow/controller/pod/significant_test.go

This file was deleted.