Skip to content

Commit

Permalink
fix(offload): Fix bug which deleted completed workflows. Fixes #2233 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Feb 13, 2020
1 parent 47d9a41 commit 4670c99
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 25 deletions.
29 changes: 24 additions & 5 deletions persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"encoding/json"
"fmt"
"hash/fnv"
"os"
"strings"
"time"

log "github.com/sirupsen/logrus"
"upper.io/db.v3"
Expand All @@ -27,8 +29,19 @@ type OffloadNodeStatusRepo interface {
IsEnabled() bool
}

func NewOffloadNodeStatusRepo(session sqlbuilder.Database, clusterName, tableName string) OffloadNodeStatusRepo {
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName}
func NewOffloadNodeStatusRepo(session sqlbuilder.Database, clusterName, tableName string) (OffloadNodeStatusRepo, error) {
// this environment variable allows you to make Argo Workflows delete offloaded data more or less aggressively,
// useful for testing
text, ok := os.LookupEnv("OFFLOAD_NODE_STATUS_TTL")
if !ok {
text = "5m"
}
ttl, err := time.ParseDuration(text)
if err != nil {
return nil, err
}
log.WithField("ttl", ttl).Info("Node status offloading config")
return &nodeOffloadRepo{session: session, clusterName: clusterName, tableName: tableName, ttl: ttl}, nil
}

type nodesRecord struct {
Expand All @@ -42,6 +55,8 @@ type nodeOffloadRepo struct {
session sqlbuilder.Database
clusterName string
tableName string
// time to live - at what ttl an offload becomes old
ttl time.Duration
}

func (wdc *nodeOffloadRepo) IsEnabled() bool {
Expand Down Expand Up @@ -88,7 +103,7 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
logCtx.WithField("err", err).Info("Ignoring duplicate key error")
}

logCtx.Info("Nodes offloaded, cleaning up old offloads")
logCtx.Debug("Nodes offloaded, cleaning up old offloads")

// This might fail, which kind of fine (maybe a bug).
// It might not delete all records, which is also fine, as we always key on resource version.
Expand All @@ -98,7 +113,7 @@ func (wdc *nodeOffloadRepo) Save(uid, namespace string, nodes wfv1.Nodes) (strin
Where(db.Cond{"clustername": wdc.clusterName}).
And(db.Cond{"uid": uid}).
And(db.Cond{"version <>": version}).
And("updatedat < current_timestamp - interval '5' minute").
And(wdc.oldOffload()).
Exec()
if err != nil {
return "", err
Expand Down Expand Up @@ -177,7 +192,7 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, er
From(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(namespaceEqual(namespace)).
And("updatedat < current_timestamp - interval '5' minute").
And(wdc.oldOffload()).
All(&records)
if err != nil {
return nil, err
Expand Down Expand Up @@ -210,3 +225,7 @@ func (wdc *nodeOffloadRepo) Delete(uid, version string) error {
logCtx.WithField("rowsAffected", rowsAffected).Debug("Deleted offloaded nodes")
return nil
}

func (wdc *nodeOffloadRepo) oldOffload() string {
return fmt.Sprintf("updatedat < current_timestamp - interval '%d' second", int(wdc.ttl.Seconds()))
}
10 changes: 7 additions & 3 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,14 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
log.WithField("nodeStatusOffload", persistence.NodeStatusOffload).Info("Offload node status")
if persistence.NodeStatusOffload {
offloadRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
// we always enable node offload, as this is read-only for the Argo Server, i.e. you can turn it off if you
// like and the controller won't offload newly created workflows, but you can still read them
offloadRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
log.Fatal(err)
}
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive)
Expand Down
5 changes: 4 additions & 1 deletion test/e2e/fixtures/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ func newPersistence(kubeClient kubernetes.Interface) *Persistence {
if err != nil {
panic(err)
}
offloadNodeStatusRepo := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
offloadNodeStatusRepo, err := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
panic(err)
}
workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
return &Persistence{session, offloadNodeStatusRepo, workflowArchive}
} else {
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/manifests/mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ spec:
env:
- name: ALWAYS_OFFLOAD_NODE_STATUS
value: "true"
- name: OFFLOAD_NODE_STATUS_TTL
value: 30s
- name: WORKFLOW_GC_PERIOD
value: 30s
- name: UPPERIO_DB_DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
- op: add
path: /spec/template/spec/containers/0/env
value:
# these are designed to force offloading on and make GC happen regularly so we shake out any bugs
- name: ALWAYS_OFFLOAD_NODE_STATUS
value: "true"
- name: OFFLOAD_NODE_STATUS_TTL
value: 30s
- name: WORKFLOW_GC_PERIOD
value: 30s
- name: UPPERIO_DB_DEBUG
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/manifests/no-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ spec:
env:
- name: ALWAYS_OFFLOAD_NODE_STATUS
value: "true"
- name: OFFLOAD_NODE_STATUS_TTL
value: 30s
- name: WORKFLOW_GC_PERIOD
value: 30s
- name: UPPERIO_DB_DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
- op: add
path: /spec/template/spec/containers/0/env
value:
# these are designed to force offloading on and make GC happen regularly so we shake out any bugs
- name: ALWAYS_OFFLOAD_NODE_STATUS
value: "true"
- name: OFFLOAD_NODE_STATUS_TTL
value: 30s
- name: WORKFLOW_GC_PERIOD
value: 30s
- name: UPPERIO_DB_DEBUG
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/manifests/postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ spec:
env:
- name: ALWAYS_OFFLOAD_NODE_STATUS
value: "true"
- name: OFFLOAD_NODE_STATUS_TTL
value: 30s
- name: WORKFLOW_GC_PERIOD
value: 30s
- name: UPPERIO_DB_DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
- op: add
path: /spec/template/spec/containers/0/env
value:
# these are designed to force offloading on and make GC happen regularly so we shake out any bugs
- name: ALWAYS_OFFLOAD_NODE_STATUS
value: "true"
- name: OFFLOAD_NODE_STATUS_TTL
value: 30s
- name: WORKFLOW_GC_PERIOD
value: 30s
- name: UPPERIO_DB_DEBUG
Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,15 @@ func (wfc *WorkflowController) updateConfig(cm *apiv1.ConfigMap) error {
}

wfc.session = session
wfc.offloadNodeStatusRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if persistence.NodeStatusOffload {
wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
return err
}
log.Info("Node status offloading is enabled")
} else {
log.Info("Node status offloading is disabled")
}
if persistence.Archive {
wfc.wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
log.Info("Workflow archiving is enabled")
Expand Down
46 changes: 31 additions & 15 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type WorkflowController struct {
wfclientset wfclientset.Interface

// datastructures to support the processing of workflows and workflow pods
wfInformer cache.SharedIndexInformer
incompleteWfInformer cache.SharedIndexInformer
// only complete (i.e. not running) workflows
completedWfInformer cache.SharedIndexInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
podInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -165,21 +167,23 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
return
}

wfc.wfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakWorkflowlist)
wfc.incompleteWfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.incompleteWorkflowTweakListOptions)
wfc.completedWfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.completedWorkflowTweakListOptions)
wfc.wftmplInformer = wfc.newWorkflowTemplateInformer()

wfc.addWorkflowInformerHandler()
wfc.podInformer = wfc.newPodInformer()

go wfc.wfInformer.Run(ctx.Done())
go wfc.incompleteWfInformer.Run(ctx.Done())
go wfc.completedWfInformer.Run(ctx.Done())
go wfc.wftmplInformer.Informer().Run(ctx.Done())
go wfc.podInformer.Run(ctx.Done())
go wfc.podLabeler(ctx.Done())
go wfc.podGarbageCollector(ctx.Done())
go wfc.periodicWorkflowGarbageCollector(ctx.Done())

// Wait for all involved caches to be synced, before processing items from the queue is started
for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} {
for _, informer := range []cache.SharedIndexInformer{wfc.incompleteWfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
Expand Down Expand Up @@ -271,21 +275,26 @@ func (wfc *WorkflowController) periodicWorkflowGarbageCollector(stopCh <-chan st
continue
}
if len(oldRecords) == 0 {
log.Info("Zero old records, nothing to do")
log.Info("Zero old offloads, nothing to do")
continue
}
// get every lives workflow (1000s) into a map
liveOffloadNodeStatusVersions := make(map[types.UID]string)
list, err := util.NewWorkflowLister(wfc.wfInformer).List()
incomplete, err := util.NewWorkflowLister(wfc.incompleteWfInformer).List()
if err != nil {
log.WithField("err", err).Error("Failed to list workflows")
log.WithField("err", err).Error("Failed to list incomplete workflows")
continue
}
for _, wf := range list {
completed, err := util.NewWorkflowLister(wfc.completedWfInformer).List()
if err != nil {
log.WithField("err", err).Error("Failed to list completed workflows")
continue
}
for _, wf := range append(completed, incomplete...) {
// this could be the empty string - as it is no longer offloaded
liveOffloadNodeStatusVersions[wf.UID] = wf.Status.OffloadNodeStatusVersion
}
log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_records": len(oldRecords)}).Info("Deleting old UIDs that are not live")
log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_offloads": len(oldRecords)}).Info("Deleting old offloads that are not live")
for _, record := range oldRecords {
// this could be empty string
nodeStatusVersion, ok := liveOffloadNodeStatusVersions[types.UID(record.UID)]
Expand Down Expand Up @@ -314,7 +323,7 @@ func (wfc *WorkflowController) processNextItem() bool {
}
defer wfc.wfQueue.Done(key)

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
obj, exists, err := wfc.incompleteWfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.Errorf("Failed to get workflow '%s' from informer index: %+v", key, err)
return true
Expand Down Expand Up @@ -456,15 +465,22 @@ func (wfc *WorkflowController) processNextPodItem() bool {
return true
}

func (wfc *WorkflowController) tweakWorkflowlist(options *metav1.ListOptions) {
func (wfc *WorkflowController) incompleteWorkflowTweakListOptions(options *metav1.ListOptions) {
wfc.tweakListOptions(selection.NotIn, options)
}

func (wfc *WorkflowController) completedWorkflowTweakListOptions(options *metav1.ListOptions) {
wfc.tweakListOptions(selection.In, options)
}

func (wfc *WorkflowController) tweakListOptions(completedOp selection.Operator, options *metav1.ListOptions) {
options.FieldSelector = fields.Everything().String()
// completed notin (true)
incompleteReq, err := labels.NewRequirement(common.LabelKeyCompleted, selection.NotIn, []string{"true"})
requirement, err := labels.NewRequirement(common.LabelKeyCompleted, completedOp, []string{"true"})
if err != nil {
panic(err)
}
labelSelector := labels.NewSelector().
Add(*incompleteReq).
Add(*requirement).
Add(util.InstanceIDRequirement(wfc.Config.InstanceID))
options.LabelSelector = labelSelector.String()
}
Expand Down Expand Up @@ -492,7 +508,7 @@ func getWfPriority(obj interface{}) (int32, time.Time) {
}

func (wfc *WorkflowController) addWorkflowInformerHandler() {
wfc.wfInformer.AddEventHandler(
wfc.incompleteWfInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
Expand Down

0 comments on commit 4670c99

Please sign in to comment.