diff --git a/persist/sqldb/offload_node_status_repo.go b/persist/sqldb/offload_node_status_repo.go index 50e8cbc0e12c..5c08b7748c77 100644 --- a/persist/sqldb/offload_node_status_repo.go +++ b/persist/sqldb/offload_node_status_repo.go @@ -4,7 +4,9 @@ import ( "encoding/json" "fmt" "hash/fnv" + "os" "strings" + "time" log "github.com/sirupsen/logrus" "upper.io/db.v3" @@ -27,8 +29,19 @@ type OffloadNodeStatusRepo interface { IsEnabled() bool } -func NewOffloadNodeStatusRepo(session sqlbuilder.Database, clusterName, tableName string) OffloadNodeStatusRepo { - return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName} +func NewOffloadNodeStatusRepo(session sqlbuilder.Database, clusterName, tableName string) (OffloadNodeStatusRepo, error) { + // this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively, + // useful for testing + text, ok := os.LookupEnv("OFFLOAD_NODE_STATUS_TTL") + if !ok { + text = "5m" + } + ttl, err := time.ParseDuration(text) + if err != nil { + return nil, err + } + log.WithField("ttl", ttl).Info("Node status offloading config") + return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil } type nodesRecord struct { @@ -42,6 +55,8 @@ type nodeOffloadRepo struct { session sqlbuilder.Database clusterName string tableName string + // time to live - at what ttl an offload becomes old + ttl time.Duration } func (wdc *nodeOffloadRepo) IsEnabled() bool { @@ -88,7 +103,7 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin logCtx.WithField("err", err).Info("Ignoring duplicate key error") } - logCtx.Info("Nodes offloaded, cleaning up old offloads") + logCtx.Debug("Nodes offloaded, cleaning up old offloads") // This might fail, which kind of fine (maybe a bug). // It might not delete all records, which is also fine, as we always key on resource version. @@ -98,7 +113,7 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin Where(db.Cond{"clustername": wdc.clusterName}). And(db.Cond{"uid": uid}). And(db.Cond{"version <>": version}). - And("updatedat < current_timestamp - interval '5' minute"). + And(wdc.oldOffload()). Exec() if err != nil { return "", err @@ -177,7 +192,7 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, er From(wdc.tableName). Where(db.Cond{"clustername": wdc.clusterName}). And(namespaceEqual(namespace)). - And("updatedat < current_timestamp - interval '5' minute"). + And(wdc.oldOffload()). All(&records) if err != nil { return nil, err @@ -210,3 +225,7 @@ func (wdc *nodeOffloadRepo) Delete(uid, version string) error { logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes") return nil } + +func (wdc *nodeOffloadRepo) oldOffload() string { + return fmt.Sprintf("updatedat < current_timestamp - interval '%d' second", int(wdc.ttl.Seconds())) +} diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 540e288a2840..c26f737669d2 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -124,10 +124,14 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st if err != nil { log.Fatal(err) } - log.WithField("nodeStatusOffload", persistence.NodeStatusOffload).Info("Offload node status") - if persistence.NodeStatusOffload { - offloadRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + // we always enable node offload, as this is read-only for the Argo Server, i.e. you can turn it off if you + // like and the controller won't offload newly created workflows, but you can still read them + offloadRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + if err != nil { + log.Fatal(err) } + // we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can + // disable the archiving - and still read old records wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName()) } artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive) diff --git a/test/e2e/fixtures/persistence.go b/test/e2e/fixtures/persistence.go index 3fa277f89ee5..a06291341812 100644 --- a/test/e2e/fixtures/persistence.go +++ b/test/e2e/fixtures/persistence.go @@ -38,7 +38,10 @@ func newPersistence(kubeClient kubernetes.Interface) *Persistence { if err != nil { panic(err) } - offloadNodeStatusRepo := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + offloadNodeStatusRepo, err := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + if err != nil { + panic(err) + } workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName()) return &Persistence{session, offloadNodeStatusRepo, workflowArchive} } else { diff --git a/test/e2e/manifests/mysql.yaml b/test/e2e/manifests/mysql.yaml index 2013f5eb6a36..0cbf301c9d47 100644 --- a/test/e2e/manifests/mysql.yaml +++ b/test/e2e/manifests/mysql.yaml @@ -467,6 +467,8 @@ spec: env: - name: ALWAYS_OFFLOAD_NODE_STATUS value: "true" + - name: OFFLOAD_NODE_STATUS_TTL + value: 30s - name: WORKFLOW_GC_PERIOD value: 30s - name: UPPERIO_DB_DEBUG diff --git a/test/e2e/manifests/mysql/overlays/workflow-controller-deployment.yaml b/test/e2e/manifests/mysql/overlays/workflow-controller-deployment.yaml index 2192bd96dbfa..db7cfe053fec 100644 --- a/test/e2e/manifests/mysql/overlays/workflow-controller-deployment.yaml +++ b/test/e2e/manifests/mysql/overlays/workflow-controller-deployment.yaml @@ -23,8 +23,11 @@ - op: add path: /spec/template/spec/containers/0/env value: + # these are designed to force offloading on and make GC happen regularly so we shake out any bugs - name: ALWAYS_OFFLOAD_NODE_STATUS value: "true" + - name: OFFLOAD_NODE_STATUS_TTL + value: 30s - name: WORKFLOW_GC_PERIOD value: 30s - name: UPPERIO_DB_DEBUG diff --git a/test/e2e/manifests/no-db.yaml b/test/e2e/manifests/no-db.yaml index eb332dfcb663..1b56b951c1ca 100644 --- a/test/e2e/manifests/no-db.yaml +++ b/test/e2e/manifests/no-db.yaml @@ -382,6 +382,8 @@ spec: env: - name: ALWAYS_OFFLOAD_NODE_STATUS value: "true" + - name: OFFLOAD_NODE_STATUS_TTL + value: 30s - name: WORKFLOW_GC_PERIOD value: 30s - name: UPPERIO_DB_DEBUG diff --git a/test/e2e/manifests/no-db/overlays/workflow-controller-deployment.yaml b/test/e2e/manifests/no-db/overlays/workflow-controller-deployment.yaml index 2192bd96dbfa..db7cfe053fec 100644 --- a/test/e2e/manifests/no-db/overlays/workflow-controller-deployment.yaml +++ b/test/e2e/manifests/no-db/overlays/workflow-controller-deployment.yaml @@ -23,8 +23,11 @@ - op: add path: /spec/template/spec/containers/0/env value: + # these are designed to force offloading on and make GC happen regularly so we shake out any bugs - name: ALWAYS_OFFLOAD_NODE_STATUS value: "true" + - name: OFFLOAD_NODE_STATUS_TTL + value: 30s - name: WORKFLOW_GC_PERIOD value: 30s - name: UPPERIO_DB_DEBUG diff --git a/test/e2e/manifests/postgres.yaml b/test/e2e/manifests/postgres.yaml index 9444b352191f..7d2951a6d20a 100644 --- a/test/e2e/manifests/postgres.yaml +++ b/test/e2e/manifests/postgres.yaml @@ -459,6 +459,8 @@ spec: env: - name: ALWAYS_OFFLOAD_NODE_STATUS value: "true" + - name: OFFLOAD_NODE_STATUS_TTL + value: 30s - name: WORKFLOW_GC_PERIOD value: 30s - name: UPPERIO_DB_DEBUG diff --git a/test/e2e/manifests/postgres/overlays/workflow-controller-deployment.yaml b/test/e2e/manifests/postgres/overlays/workflow-controller-deployment.yaml index 77df8a591c40..7a42b4d41bd0 100644 --- a/test/e2e/manifests/postgres/overlays/workflow-controller-deployment.yaml +++ b/test/e2e/manifests/postgres/overlays/workflow-controller-deployment.yaml @@ -22,8 +22,11 @@ - op: add path: /spec/template/spec/containers/0/env value: + # these are designed to force offloading on and make GC happen regularly so we shake out any bugs - name: ALWAYS_OFFLOAD_NODE_STATUS value: "true" + - name: OFFLOAD_NODE_STATUS_TTL + value: 30s - name: WORKFLOW_GC_PERIOD value: 30s - name: UPPERIO_DB_DEBUG diff --git a/workflow/controller/config_controller.go b/workflow/controller/config_controller.go index 4dae4d67d4c8..d1c04470234f 100644 --- a/workflow/controller/config_controller.go +++ b/workflow/controller/config_controller.go @@ -71,7 +71,15 @@ func (wfc *WorkflowController) updateConfig(cm *apiv1.ConfigMap) error { } wfc.session = session - wfc.offloadNodeStatusRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + if persistence.NodeStatusOffload { + wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName) + if err != nil { + return err + } + log.Info("Node status offloading is enabled") + } else { + log.Info("Node status offloading is disabled") + } if persistence.Archive { wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName()) log.Info("Workflow archiving is enabled") diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 225514148e39..38fc0131b4d4 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -63,7 +63,9 @@ type WorkflowController struct { wfclientset wfclientset.Interface // datastructures to support the processing of workflows and workflow pods - wfInformer cache.SharedIndexInformer + incompleteWfInformer cache.SharedIndexInformer + // only complete (i.e. not running) workflows + completedWfInformer cache.SharedIndexInformer wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer podInformer cache.SharedIndexInformer wfQueue workqueue.RateLimitingInterface @@ -165,13 +167,15 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in return } - wfc.wfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakWorkflowlist) + wfc.incompleteWfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.incompleteWorkflowTweakListOptions) + wfc.completedWfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.completedWorkflowTweakListOptions) wfc.wftmplInformer = wfc.newWorkflowTemplateInformer() wfc.addWorkflowInformerHandler() wfc.podInformer = wfc.newPodInformer() - go wfc.wfInformer.Run(ctx.Done()) + go wfc.incompleteWfInformer.Run(ctx.Done()) + go wfc.completedWfInformer.Run(ctx.Done()) go wfc.wftmplInformer.Informer().Run(ctx.Done()) go wfc.podInformer.Run(ctx.Done()) go wfc.podLabeler(ctx.Done()) @@ -179,7 +183,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in go wfc.periodicWorkflowGarbageCollector(ctx.Done()) // Wait for all involved caches to be synced, before processing items from the queue is started - for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} { + for _, informer := range []cache.SharedIndexInformer{wfc.incompleteWfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} { if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { log.Error("Timed out waiting for caches to sync") return @@ -271,21 +275,26 @@ func (wfc *WorkflowController) periodicWorkflowGarbageCollector(stopCh <-chan st continue } if len(oldRecords) == 0 { - log.Info("Zero old records, nothing to do") + log.Info("Zero old offloads, nothing to do") continue } // get every lives workflow (1000s) into a map liveOffloadNodeStatusVersions := make(map[types.UID]string) - list, err := util.NewWorkflowLister(wfc.wfInformer).List() + incomplete, err := util.NewWorkflowLister(wfc.incompleteWfInformer).List() if err != nil { - log.WithField("err", err).Error("Failed to list workflows") + log.WithField("err", err).Error("Failed to list incomplete workflows") continue } - for _, wf := range list { + completed, err := util.NewWorkflowLister(wfc.completedWfInformer).List() + if err != nil { + log.WithField("err", err).Error("Failed to list completed workflows") + continue + } + for _, wf := range append(completed, incomplete...) { // this could be the empty string - as it is no longer offloaded liveOffloadNodeStatusVersions[wf.UID] = wf.Status.OffloadNodeStatusVersion } - log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_records": len(oldRecords)}).Info("Deleting old UIDs that are not live") + log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_offloads": len(oldRecords)}).Info("Deleting old offloads that are not live") for _, record := range oldRecords { // this could be empty string nodeStatusVersion, ok := liveOffloadNodeStatusVersions[types.UID(record.UID)] @@ -314,7 +323,7 @@ func (wfc *WorkflowController) processNextItem() bool { } defer wfc.wfQueue.Done(key) - obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string)) + obj, exists, err := wfc.incompleteWfInformer.GetIndexer().GetByKey(key.(string)) if err != nil { log.Errorf("Failed to get workflow '%s' from informer index: %+v", key, err) return true @@ -456,15 +465,22 @@ func (wfc *WorkflowController) processNextPodItem() bool { return true } -func (wfc *WorkflowController) tweakWorkflowlist(options *metav1.ListOptions) { +func (wfc *WorkflowController) incompleteWorkflowTweakListOptions(options *metav1.ListOptions) { + wfc.tweakListOptions(selection.NotIn, options) +} + +func (wfc *WorkflowController) completedWorkflowTweakListOptions(options *metav1.ListOptions) { + wfc.tweakListOptions(selection.In, options) +} + +func (wfc *WorkflowController) tweakListOptions(completedOp selection.Operator, options *metav1.ListOptions) { options.FieldSelector = fields.Everything().String() - // completed notin (true) - incompleteReq, err := labels.NewRequirement(common.LabelKeyCompleted, selection.NotIn, []string{"true"}) + requirement, err := labels.NewRequirement(common.LabelKeyCompleted, completedOp, []string{"true"}) if err != nil { panic(err) } labelSelector := labels.NewSelector(). - Add(*incompleteReq). + Add(*requirement). Add(util.InstanceIDRequirement(wfc.Config.InstanceID)) options.LabelSelector = labelSelector.String() } @@ -492,7 +508,7 @@ func getWfPriority(obj interface{}) (int32, time.Time) { } func (wfc *WorkflowController) addWorkflowInformerHandler() { - wfc.wfInformer.AddEventHandler( + wfc.incompleteWfInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj)